Stxxl  1.3.2
sort_stream.h
1 /***************************************************************************
2  * include/stxxl/bits/stream/sort_stream.h
3  *
4  * Part of the STXXL. See http://stxxl.sourceforge.net
5  *
6  * Copyright (C) 2002-2005 Roman Dementiev <[email protected]>
7  * Copyright (C) 2006-2008 Johannes Singler <[email protected]>
8  * Copyright (C) 2008-2010 Andreas Beckmann <[email protected]>
9  *
10  * Distributed under the Boost Software License, Version 1.0.
11  * (See accompanying file LICENSE_1_0.txt or copy at
12  * http://www.boost.org/LICENSE_1_0.txt)
13  **************************************************************************/
14 
15 #ifndef STXXL_SORT_STREAM_HEADER
16 #define STXXL_SORT_STREAM_HEADER
17 
18 #ifdef STXXL_BOOST_CONFIG
19  #include <boost/config.hpp>
20 #endif
21 
22 #include <stxxl/bits/stream/stream.h>
23 #include <stxxl/bits/mng/mng.h>
24 #include <stxxl/bits/algo/sort_base.h>
25 #include <stxxl/bits/algo/sort_helper.h>
26 #include <stxxl/bits/algo/adaptor.h>
27 #include <stxxl/bits/algo/run_cursor.h>
28 #include <stxxl/bits/algo/losertree.h>
29 #include <stxxl/bits/stream/sorted_runs.h>
30 
31 
32 __STXXL_BEGIN_NAMESPACE
33 
34 namespace stream
35 {
38 
39 
41  // CREATE RUNS //
43 
50  template <
51  class Input_,
52  class Cmp_,
53  unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(typename Input_::value_type),
54  class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
55  class basic_runs_creator : private noncopyable
56  {
57  protected:
58  Input_ & input;
59  Cmp_ cmp;
60 
61  public:
62  typedef Cmp_ cmp_type;
63  typedef typename Input_::value_type value_type;
65  typedef sort_helper::trigger_entry<block_type> trigger_entry_type;
67 
68  private:
69  typedef typename sorted_runs_type::run_type run_type;
70  sorted_runs_type result_; // stores the result (sorted runs)
71  unsigned_type m_; // memory for internal use in blocks
72  bool result_computed; // true iff result is already computed (used in 'result' method)
73 
75  unsigned_type fetch(block_type * blocks, unsigned_type first_idx, unsigned_type last_idx)
76  {
77  typename element_iterator_traits<block_type>::element_iterator output =
78  make_element_iterator(blocks, first_idx);
79  unsigned_type curr_idx = first_idx;
80  while (!input.empty() && curr_idx != last_idx) {
81  *output = *input;
82  ++input;
83  ++output;
84  ++curr_idx;
85  }
86  return curr_idx;
87  }
88 
89  void fill_with_max_value(block_type * blocks, unsigned_type num_blocks, unsigned_type first_idx)
90  {
91  unsigned_type last_idx = num_blocks * block_type::size;
92  if (first_idx < last_idx) {
93  typename element_iterator_traits<block_type>::element_iterator curr =
94  make_element_iterator(blocks, first_idx);
95  while (first_idx != last_idx) {
96  *curr = cmp.max_value();
97  ++curr;
98  ++first_idx;
99  }
100  }
101  }
102 
104  void sort_run(block_type * run, unsigned_type elements)
105  {
107  sort(make_element_iterator(run, 0),
108  make_element_iterator(run, elements),
109  cmp);
110  }
111 
112  void compute_result();
113 
114  public:
119  basic_runs_creator(Input_ & i, Cmp_ c, unsigned_type memory_to_use) :
120  input(i), cmp(c), m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()), result_computed(false)
121  {
122  sort_helper::verify_sentinel_strict_weak_ordering(cmp);
123  if (!(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use)) {
124  throw bad_parameter("stxxl::runs_creator<>:runs_creator(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
125  }
126  assert(m_ > 0);
127  }
128 
133  {
134  if (!result_computed)
135  {
136  compute_result();
137  result_computed = true;
138 #ifdef STXXL_PRINT_STAT_AFTER_RF
139  STXXL_MSG(*stats::get_instance());
140 #endif //STXXL_PRINT_STAT_AFTER_RF
141  }
142  return result_;
143  }
144  };
145 
149  template <class Input_, class Cmp_, unsigned BlockSize_, class AllocStr_>
150  void basic_runs_creator<Input_, Cmp_, BlockSize_, AllocStr_>::compute_result()
151  {
152  unsigned_type i = 0;
153  unsigned_type m2 = m_ / 2;
154  const unsigned_type el_in_run = m2 * block_type::size; // # el in a run
155  STXXL_VERBOSE1("basic_runs_creator::compute_result m2=" << m2);
156  unsigned_type blocks1_length = 0, blocks2_length = 0;
157  block_type * Blocks1 = NULL;
158 
159 #ifndef STXXL_SMALL_INPUT_PSORT_OPT
160  Blocks1 = new block_type[m2 * 2];
161 #else
162  while (!input.empty() && blocks1_length != block_type::size)
163  {
164  result_.small_.push_back(*input);
165  ++input;
166  ++blocks1_length;
167  }
168 
169  if (blocks1_length == block_type::size && !input.empty())
170  {
171  Blocks1 = new block_type[m2 * 2];
172  std::copy(result_.small_.begin(), result_.small_.end(), Blocks1[0].begin());
173  result_.small_.clear();
174  }
175  else
176  {
177  STXXL_VERBOSE1("basic_runs_creator: Small input optimization, input length: " << blocks1_length);
178  result_.elements = blocks1_length;
180  sort(result_.small_.begin(), result_.small_.end(), cmp);
181  return;
182  }
183 #endif //STXXL_SMALL_INPUT_PSORT_OPT
184 
185  // the first block may be there already
186  blocks1_length = fetch(Blocks1, blocks1_length, el_in_run);
187 
188  // sort first run
189  sort_run(Blocks1, blocks1_length);
190  if (blocks1_length <= block_type::size && input.empty())
191  {
192  // small input, do not flush it on the disk(s)
193  STXXL_VERBOSE1("basic_runs_creator: Small input optimization, input length: " << blocks1_length);
194  assert(result_.small_.empty());
195  result_.small_.insert(result_.small_.end(), Blocks1[0].begin(), Blocks1[0].begin() + blocks1_length);
196  result_.elements = blocks1_length;
197  delete[] Blocks1;
198  return;
199  }
200 
201  block_type * Blocks2 = Blocks1 + m2;
202  block_manager * bm = block_manager::get_instance();
203  request_ptr * write_reqs = new request_ptr[m2];
204  run_type run;
205 
206  unsigned_type cur_run_size = div_ceil(blocks1_length, block_type::size); // in blocks
207  run.resize(cur_run_size);
208  bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
209 
210  disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
211 
212  // fill the rest of the last block with max values
213  fill_with_max_value(Blocks1, cur_run_size, blocks1_length);
214 
215  for (i = 0; i < cur_run_size; ++i)
216  {
217  run[i].value = Blocks1[i][0];
218  write_reqs[i] = Blocks1[i].write(run[i].bid);
219  }
220  result_.runs.push_back(run);
221  result_.runs_sizes.push_back(blocks1_length);
222  result_.elements += blocks1_length;
223 
224  if (input.empty())
225  {
226  // return
227  wait_all(write_reqs, write_reqs + cur_run_size);
228  delete[] write_reqs;
229  delete[] Blocks1;
230  return;
231  }
232 
233  STXXL_VERBOSE1("Filling the second part of the allocated blocks");
234  blocks2_length = fetch(Blocks2, 0, el_in_run);
235 
236  if (input.empty())
237  {
238  // optimization if the whole set fits into both halves
239  // (re)sort internally and return
240  blocks2_length += el_in_run;
241  sort_run(Blocks1, blocks2_length); // sort first an second run together
242  wait_all(write_reqs, write_reqs + cur_run_size);
243  bm->delete_blocks(make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
244 
245  cur_run_size = div_ceil(blocks2_length, block_type::size);
246  run.resize(cur_run_size);
247  bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
248 
249  // fill the rest of the last block with max values
250  fill_with_max_value(Blocks1, cur_run_size, blocks2_length);
251 
252  assert(cur_run_size > m2);
253 
254  for (i = 0; i < m2; ++i)
255  {
256  run[i].value = Blocks1[i][0];
257  write_reqs[i]->wait();
258  write_reqs[i] = Blocks1[i].write(run[i].bid);
259  }
260 
261  request_ptr * write_reqs1 = new request_ptr[cur_run_size - m2];
262 
263  for ( ; i < cur_run_size; ++i)
264  {
265  run[i].value = Blocks1[i][0];
266  write_reqs1[i - m2] = Blocks1[i].write(run[i].bid);
267  }
268 
269  result_.runs[0] = run;
270  result_.runs_sizes[0] = blocks2_length;
271  result_.elements = blocks2_length;
272 
273  wait_all(write_reqs, write_reqs + m2);
274  delete[] write_reqs;
275  wait_all(write_reqs1, write_reqs1 + cur_run_size - m2);
276  delete[] write_reqs1;
277 
278  delete[] Blocks1;
279 
280  return;
281  }
282 
283  // more than 2 runs can be filled, i. e. the general case
284 
285  sort_run(Blocks2, blocks2_length);
286 
287  cur_run_size = div_ceil(blocks2_length, block_type::size); // in blocks
288  run.resize(cur_run_size);
289  bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
290 
291  for (i = 0; i < cur_run_size; ++i)
292  {
293  run[i].value = Blocks2[i][0];
294  write_reqs[i]->wait();
295  write_reqs[i] = Blocks2[i].write(run[i].bid);
296  }
297  assert((blocks2_length % el_in_run) == 0);
298 
299  result_.runs.push_back(run);
300  result_.runs_sizes.push_back(blocks2_length);
301  result_.elements += blocks2_length;
302 
303  while (!input.empty())
304  {
305  blocks1_length = fetch(Blocks1, 0, el_in_run);
306  sort_run(Blocks1, blocks1_length);
307  cur_run_size = div_ceil(blocks1_length, block_type::size); // in blocks
308  run.resize(cur_run_size);
309  bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
310 
311  // fill the rest of the last block with max values (occurs only on the last run)
312  fill_with_max_value(Blocks1, cur_run_size, blocks1_length);
313 
314  for (i = 0; i < cur_run_size; ++i)
315  {
316  run[i].value = Blocks1[i][0];
317  write_reqs[i]->wait();
318  write_reqs[i] = Blocks1[i].write(run[i].bid);
319  }
320  result_.runs.push_back(run);
321  result_.runs_sizes.push_back(blocks1_length);
322  result_.elements += blocks1_length;
323 
324  std::swap(Blocks1, Blocks2);
325  std::swap(blocks1_length, blocks2_length);
326  }
327 
328  wait_all(write_reqs, write_reqs + m2);
329  delete[] write_reqs;
330  delete[] ((Blocks1 < Blocks2) ? Blocks1 : Blocks2);
331  }
332 
339  template <
340  class Input_,
341  class Cmp_,
342  unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(typename Input_::value_type),
343  class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
344  class runs_creator : public basic_runs_creator<Input_, Cmp_, BlockSize_, AllocStr_>
345  {
346  private:
348 
349  public:
350  typedef typename base::block_type block_type;
351 
352  private:
353  using base::input;
354 
355  public:
360  runs_creator(Input_ & i, Cmp_ c, unsigned_type memory_to_use) : base(i, c, memory_to_use)
361  { }
362  };
363 
364 
372  template <class ValueType_>
373  struct use_push
374  {
375  typedef ValueType_ value_type;
376  };
377 
388  template <
389  class ValueType_,
390  class Cmp_,
391  unsigned BlockSize_,
392  class AllocStr_>
394  use_push<ValueType_>,
395  Cmp_,
396  BlockSize_,
397  AllocStr_>
398  : private noncopyable
399  {
400  Cmp_ cmp;
401 
402  public:
403  typedef Cmp_ cmp_type;
404  typedef ValueType_ value_type;
406  typedef sort_helper::trigger_entry<block_type> trigger_entry_type;
409 
410  private:
411  typedef typename sorted_runs_type::run_type run_type;
412  sorted_runs_type result_; // stores the result (sorted runs)
413  unsigned_type m_; // memory for internal use in blocks
414 
415  bool result_computed; // true after the result() method was called for the first time
416 
417  const unsigned_type m2;
418  const unsigned_type el_in_run;
419  unsigned_type cur_el;
420  block_type * Blocks1;
421  block_type * Blocks2;
422  request_ptr * write_reqs;
423  run_type run;
424 
425  void fill_with_max_value(block_type * blocks, unsigned_type num_blocks, unsigned_type first_idx)
426  {
427  unsigned_type last_idx = num_blocks * block_type::size;
428  if (first_idx < last_idx) {
429  typename element_iterator_traits<block_type>::element_iterator curr =
430  make_element_iterator(blocks, first_idx);
431  while (first_idx != last_idx) {
432  *curr = cmp.max_value();
433  ++curr;
434  ++first_idx;
435  }
436  }
437  }
438 
439  void sort_run(block_type * run, unsigned_type elements)
440  {
442  sort(make_element_iterator(run, 0),
443  make_element_iterator(run, elements),
444  cmp);
445  }
446 
447  void compute_result()
448  {
449  if (cur_el == 0)
450  return;
451 
452  unsigned_type cur_el_reg = cur_el;
453  sort_run(Blocks1, cur_el_reg);
454  result_.elements += cur_el_reg;
455  if (cur_el_reg <= block_type::size && result_.elements == cur_el_reg)
456  {
457  // small input, do not flush it on the disk(s)
458  STXXL_VERBOSE1("runs_creator(use_push): Small input optimization, input length: " << cur_el_reg);
459  result_.small_.resize(cur_el_reg);
460  std::copy(Blocks1[0].begin(), Blocks1[0].begin() + cur_el_reg, result_.small_.begin());
461  return;
462  }
463 
464  const unsigned_type cur_run_size = div_ceil(cur_el_reg, block_type::size); // in blocks
465  run.resize(cur_run_size);
466  block_manager * bm = block_manager::get_instance();
467  bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
468 
469  disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
470 
471  result_.runs_sizes.push_back(cur_el_reg);
472 
473  // fill the rest of the last block with max values
474  fill_with_max_value(Blocks1, cur_run_size, cur_el_reg);
475 
476  unsigned_type i = 0;
477  for ( ; i < cur_run_size; ++i)
478  {
479  run[i].value = Blocks1[i][0];
480  if (write_reqs[i].get())
481  write_reqs[i]->wait();
482 
483  write_reqs[i] = Blocks1[i].write(run[i].bid);
484  }
485  result_.runs.push_back(run);
486 
487  for (i = 0; i < m2; ++i)
488  if (write_reqs[i].get())
489  write_reqs[i]->wait();
490  }
491 
492  void cleanup()
493  {
494  delete[] write_reqs;
495  delete[] ((Blocks1 < Blocks2) ? Blocks1 : Blocks2);
496  write_reqs = NULL;
497  Blocks1 = Blocks2 = NULL;
498  }
499 
500  public:
504  runs_creator(Cmp_ c, unsigned_type memory_to_use) :
505  cmp(c), m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()), result_computed(false),
506  m2(m_ / 2),
507  el_in_run(m2 * block_type::size),
508  cur_el(0),
509  Blocks1(new block_type[m2 * 2]),
510  Blocks2(Blocks1 + m2),
511  write_reqs(new request_ptr[m2])
512  {
513  sort_helper::verify_sentinel_strict_weak_ordering(cmp);
514  if (!(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use)) {
515  throw bad_parameter("stxxl::runs_creator<>:runs_creator(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
516  }
517  assert(m2 > 0);
518  }
519 
520  ~runs_creator()
521  {
522  if (!result_computed)
523  cleanup();
524  }
525 
528  void push(const value_type & val)
529  {
530  assert(result_computed == false);
531  unsigned_type cur_el_reg = cur_el;
532  if (cur_el_reg < el_in_run)
533  {
534  Blocks1[cur_el_reg / block_type::size][cur_el_reg % block_type::size] = val;
535  ++cur_el;
536  return;
537  }
538 
539  assert(el_in_run == cur_el);
540  cur_el = 0;
541 
542  //sort and store Blocks1
543  sort_run(Blocks1, el_in_run);
544  result_.elements += el_in_run;
545 
546  const unsigned_type cur_run_size = div_ceil(el_in_run, block_type::size); // in blocks
547  run.resize(cur_run_size);
548  block_manager * bm = block_manager::get_instance();
549  bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
550 
551  disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
552 
553  result_.runs_sizes.push_back(el_in_run);
554 
555  for (unsigned_type i = 0; i < cur_run_size; ++i)
556  {
557  run[i].value = Blocks1[i][0];
558  if (write_reqs[i].get())
559  write_reqs[i]->wait();
560 
561  write_reqs[i] = Blocks1[i].write(run[i].bid);
562  }
563 
564  result_.runs.push_back(run);
565 
566  std::swap(Blocks1, Blocks2);
567 
568  push(val);
569  }
570 
575  {
576  if (1)
577  {
578  if (!result_computed)
579  {
580  compute_result();
581  result_computed = true;
582  cleanup();
583 #ifdef STXXL_PRINT_STAT_AFTER_RF
584  STXXL_MSG(*stats::get_instance());
585 #endif //STXXL_PRINT_STAT_AFTER_RF
586  }
587  }
588  return result_;
589  }
590  };
591 
592 
599  template <class ValueType_>
601  {
602  typedef ValueType_ value_type;
603  };
604 
615  template <
616  class ValueType_,
617  class Cmp_,
618  unsigned BlockSize_,
619  class AllocStr_>
621  from_sorted_sequences<ValueType_>,
622  Cmp_,
623  BlockSize_,
624  AllocStr_>
625  : private noncopyable
626  {
627  typedef ValueType_ value_type;
629  typedef sort_helper::trigger_entry<block_type> trigger_entry_type;
630  typedef AllocStr_ alloc_strategy_type;
631  Cmp_ cmp;
632 
633  public:
634  typedef Cmp_ cmp_type;
637 
638  private:
639  typedef typename sorted_runs_type::run_type run_type;
640 
641  sorted_runs_type result_; // stores the result (sorted runs)
642  unsigned_type m_; // memory for internal use in blocks
644  block_type * cur_block;
645  unsigned_type offset;
646  unsigned_type iblock;
647  unsigned_type irun;
648  alloc_strategy_type alloc_strategy; // needs to be reset after each run
649 
650  public:
655  runs_creator(Cmp_ c, unsigned_type memory_to_use) :
656  cmp(c),
657  m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()),
658  writer(m_, m_ / 2),
659  cur_block(writer.get_free_block()),
660  offset(0),
661  iblock(0),
662  irun(0)
663  {
664  sort_helper::verify_sentinel_strict_weak_ordering(cmp);
665  assert(m_ > 0);
666  if (!(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use)) {
667  throw bad_parameter("stxxl::runs_creator<>:runs_creator(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
668  }
669  }
670 
673  void push(const value_type & val)
674  {
675  assert(offset < block_type::size);
676 
677  (*cur_block)[offset] = val;
678  ++offset;
679 
680  if (offset == block_type::size)
681  {
682  // write current block
683 
684  block_manager * bm = block_manager::get_instance();
685  // allocate space for the block
686  result_.runs.resize(irun + 1);
687  result_.runs[irun].resize(iblock + 1);
688  bm->new_blocks(
689  alloc_strategy,
690  make_bid_iterator(result_.runs[irun].begin() + iblock),
691  make_bid_iterator(result_.runs[irun].end()),
692  iblock
693  );
694 
695  result_.runs[irun][iblock].value = (*cur_block)[0]; // init trigger
696  cur_block = writer.write(cur_block, result_.runs[irun][iblock].bid);
697  ++iblock;
698 
699  offset = 0;
700  }
701 
702  ++result_.elements;
703  }
704 
706  void finish()
707  {
708  if (offset == 0 && iblock == 0) // current run is empty
709  return;
710 
711 
712  result_.runs_sizes.resize(irun + 1);
713  result_.runs_sizes.back() = iblock * block_type::size + offset;
714 
715  if (offset) // if current block is partially filled
716  {
717  while (offset != block_type::size)
718  {
719  (*cur_block)[offset] = cmp.max_value();
720  ++offset;
721  }
722  offset = 0;
723 
724  block_manager * bm = block_manager::get_instance();
725  // allocate space for the block
726  result_.runs.resize(irun + 1);
727  result_.runs[irun].resize(iblock + 1);
728  bm->new_blocks(
729  alloc_strategy,
730  make_bid_iterator(result_.runs[irun].begin() + iblock),
731  make_bid_iterator(result_.runs[irun].end()),
732  iblock
733  );
734 
735  result_.runs[irun][iblock].value = (*cur_block)[0]; // init trigger
736  cur_block = writer.write(cur_block, result_.runs[irun][iblock].bid);
737  }
738  else
739  { }
740 
741  alloc_strategy = alloc_strategy_type(); // reinitialize block allocator for the next run
742  iblock = 0;
743  ++irun;
744  }
745 
750  {
751  finish();
752  writer.flush();
753 
754  return result_;
755  }
756  };
757 
758 
763  template <class RunsType_, class Cmp_>
764  bool check_sorted_runs(const RunsType_ & sruns, Cmp_ cmp)
765  {
766  sort_helper::verify_sentinel_strict_weak_ordering(cmp);
767  typedef typename RunsType_::block_type block_type;
768  typedef typename block_type::value_type value_type;
769  STXXL_VERBOSE2("Elements: " << sruns.elements);
770  unsigned_type nruns = sruns.runs.size();
771  STXXL_VERBOSE2("Runs: " << nruns);
772  unsigned_type irun = 0;
773  for (irun = 0; irun < nruns; ++irun)
774  {
775  const unsigned_type nblocks = sruns.runs[irun].size();
776  block_type * blocks = new block_type[nblocks];
777  request_ptr * reqs = new request_ptr[nblocks];
778  for (unsigned_type j = 0; j < nblocks; ++j)
779  {
780  reqs[j] = blocks[j].read(sruns.runs[irun][j].bid);
781  }
782  wait_all(reqs, reqs + nblocks);
783  for (unsigned_type j = 0; j < nblocks; ++j)
784  {
785  if (cmp(blocks[j][0], sruns.runs[irun][j].value) ||
786  cmp(sruns.runs[irun][j].value, blocks[j][0]))
787  {
788  STXXL_ERRMSG("check_sorted_runs wrong trigger in the run");
789  return false;
790  }
791  }
792  if (!stxxl::is_sorted(make_element_iterator(blocks, 0),
793  make_element_iterator(blocks, sruns.runs_sizes[irun]),
794  cmp))
795  {
796  STXXL_ERRMSG("check_sorted_runs wrong order in the run");
797  return false;
798  }
799 
800  delete[] reqs;
801  delete[] blocks;
802  }
803 
804  STXXL_MSG("Checking runs finished successfully");
805 
806  return true;
807  }
808 
809 
811  // MERGE RUNS //
813 
820  template <class RunsType_,
821  class Cmp_,
822  class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
823  class basic_runs_merger : private noncopyable
824  {
825  protected:
826  typedef RunsType_ sorted_runs_type;
827  typedef AllocStr_ alloc_strategy;
828  typedef typename sorted_runs_type::size_type size_type;
829  typedef Cmp_ value_cmp;
830  typedef typename sorted_runs_type::run_type run_type;
831  typedef typename sorted_runs_type::block_type block_type;
832  typedef block_type out_block_type;
833  typedef typename run_type::value_type trigger_entry_type;
835  typedef run_cursor2<block_type, prefetcher_type> run_cursor_type;
836  typedef sort_helper::run_cursor2_cmp<block_type, prefetcher_type, value_cmp> run_cursor2_cmp_type;
837  typedef loser_tree<run_cursor_type, run_cursor2_cmp_type> loser_tree_type;
838  typedef stxxl::int64 diff_type;
839  typedef std::pair<typename block_type::iterator, typename block_type::iterator> sequence;
840  typedef typename std::vector<sequence>::size_type seqs_size_type;
841 
842  public:
844  typedef typename sorted_runs_type::value_type value_type;
845 
846  private:
847  sorted_runs_type sruns;
848  value_cmp cmp;
849  size_type elements_remaining;
850 
851  out_block_type * current_block;
852  unsigned_type buffer_pos;
853  value_type current_value; // cache for the current value
854 
855  run_type consume_seq;
856  int_type * prefetch_seq;
857  prefetcher_type * prefetcher;
858  loser_tree_type * losers;
859 #if STXXL_PARALLEL_MULTIWAY_MERGE
860  std::vector<sequence> * seqs;
861  std::vector<block_type *> * buffers;
862  diff_type num_currently_mergeable;
863 #endif
864 
865 #if STXXL_CHECK_ORDER_IN_SORTS
866  value_type last_element;
867 #endif //STXXL_CHECK_ORDER_IN_SORTS
868 
870 
871  void merge_recursively(unsigned_type memory_to_use);
872 
873  void deallocate_prefetcher()
874  {
875  if (prefetcher)
876  {
877  delete losers;
878 #if STXXL_PARALLEL_MULTIWAY_MERGE
879  delete seqs;
880  delete buffers;
881 #endif
882  delete prefetcher;
883  delete[] prefetch_seq;
884  prefetcher = NULL;
885  }
886  // free blocks in runs , (or the user should do it?)
887  sruns.deallocate_blocks();
888  }
889 
890  void fill_current_block()
891  {
892  STXXL_VERBOSE1("fill_current_block");
893  if (do_parallel_merge())
894  {
895 #if STXXL_PARALLEL_MULTIWAY_MERGE
896 // begin of STL-style merging
897  diff_type rest = out_block_type::size; // elements still to merge for this output block
898 
899  do // while rest > 0 and still elements available
900  {
901  if (num_currently_mergeable < rest)
902  {
903  if (!prefetcher || prefetcher->empty())
904  {
905  // anything remaining is already in memory
906  num_currently_mergeable = elements_remaining;
907  }
908  else
909  {
910  num_currently_mergeable = sort_helper::count_elements_less_equal(
911  *seqs, consume_seq[prefetcher->pos()].value, cmp);
912  }
913  }
914 
915  diff_type output_size = STXXL_MIN(num_currently_mergeable, rest); // at most rest elements
916 
917  STXXL_VERBOSE1("before merge " << output_size);
918 
919  stxxl::parallel::multiway_merge((*seqs).begin(), (*seqs).end(), current_block->end() - rest, cmp, output_size);
920  // sequence iterators are progressed appropriately
921 
922  rest -= output_size;
923  num_currently_mergeable -= output_size;
924 
925  STXXL_VERBOSE1("after merge");
926 
927  sort_helper::refill_or_remove_empty_sequences(*seqs, *buffers, *prefetcher);
928  } while (rest > 0 && (*seqs).size() > 0);
929 
930 #if STXXL_CHECK_ORDER_IN_SORTS
931  if (!stxxl::is_sorted(current_block->begin(), current_block->end(), cmp))
932  {
933  for (value_type * i = current_block->begin() + 1; i != current_block->end(); ++i)
934  if (cmp(*i, *(i - 1)))
935  {
936  STXXL_VERBOSE1("Error at position " << (i - current_block->begin()));
937  }
938  assert(false);
939  }
940 #endif //STXXL_CHECK_ORDER_IN_SORTS
941 
942 // end of STL-style merging
943 #else
944  STXXL_THROW_UNREACHABLE();
945 #endif //STXXL_PARALLEL_MULTIWAY_MERGE
946  }
947  else
948  {
949 // begin of native merging procedure
950  losers->multi_merge(current_block->elem, current_block->elem + STXXL_MIN<size_type>(out_block_type::size, elements_remaining));
951 // end of native merging procedure
952  }
953  STXXL_VERBOSE1("current block filled");
954 
955  if (elements_remaining <= out_block_type::size)
956  deallocate_prefetcher();
957  }
958 
959  public:
964  basic_runs_merger(const sorted_runs_type & r, value_cmp c, unsigned_type memory_to_use) :
965  sruns(r),
966  cmp(c),
967  elements_remaining(sruns.elements),
968  current_block(NULL),
969  buffer_pos(0),
970  prefetch_seq(NULL),
971  prefetcher(NULL),
972  losers(NULL)
973 #if STXXL_PARALLEL_MULTIWAY_MERGE
974  , seqs(NULL),
975  buffers(NULL),
976  num_currently_mergeable(0)
977 #endif
978 #if STXXL_CHECK_ORDER_IN_SORTS
979  , last_element(cmp.min_value())
980 #endif //STXXL_CHECK_ORDER_IN_SORTS
981  {
982  initialize(r, memory_to_use);
983  }
984 
985  protected:
986  void initialize(const sorted_runs_type & r, unsigned_type memory_to_use)
987  {
988  sort_helper::verify_sentinel_strict_weak_ordering(cmp);
989 
990  sruns = r;
991  elements_remaining = r.elements;
992 
993  if (empty())
994  return;
995 
996  if (!sruns.small_run().empty())
997  {
998  // we have a small input <= B, that is kept in the main memory
999  STXXL_VERBOSE1("basic_runs_merger: small input optimization, input length: " << elements_remaining);
1000  assert(elements_remaining == size_type(sruns.small_run().size()));
1001  assert(sruns.small_run().size() <= out_block_type::size);
1002  current_block = new out_block_type;
1003  std::copy(sruns.small_run().begin(), sruns.small_run().end(), current_block->begin());
1004  current_value = current_block->elem[0];
1005  buffer_pos = 1;
1006 
1007  return;
1008  }
1009 
1010 #if STXXL_CHECK_ORDER_IN_SORTS
1011  assert(check_sorted_runs(r, cmp));
1012 #endif //STXXL_CHECK_ORDER_IN_SORTS
1013 
1014  disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
1015 
1016  int_type disks_number = config::get_instance()->disks_number();
1017  unsigned_type min_prefetch_buffers = 2 * disks_number;
1018  unsigned_type input_buffers = (memory_to_use > sizeof(out_block_type) ? memory_to_use - sizeof(out_block_type) : 0) / block_type::raw_size;
1019  unsigned_type nruns = sruns.runs.size();
1020 
1021  if (input_buffers < nruns + min_prefetch_buffers)
1022  {
1023  // can not merge runs in one pass
1024  // merge recursively:
1025  STXXL_WARNMSG_RECURSIVE_SORT("The implementation of sort requires more than one merge pass, therefore for a better");
1026  STXXL_WARNMSG_RECURSIVE_SORT("efficiency decrease block size of run storage (a parameter of the run_creator)");
1027  STXXL_WARNMSG_RECURSIVE_SORT("or increase the amount memory dedicated to the merger.");
1028  STXXL_WARNMSG_RECURSIVE_SORT("m=" << input_buffers << " nruns=" << nruns << " prefetch_blocks=" << min_prefetch_buffers);
1029 
1030  // check whether we have enough memory to merge recursively
1031  unsigned_type recursive_merge_buffers = memory_to_use / block_type::raw_size;
1032  if (recursive_merge_buffers < 2 * min_prefetch_buffers + 1 + 2) {
1033  // recursive merge uses min_prefetch_buffers for input buffering and min_prefetch_buffers output buffering
1034  // as well as 1 current output block and at least 2 input blocks
1035  STXXL_ERRMSG("There are only m=" << recursive_merge_buffers << " blocks available for recursive merging, but "
1036  << min_prefetch_buffers << "+" << min_prefetch_buffers << "+1 are needed read-ahead/write-back/output, and");
1037  STXXL_ERRMSG("the merger requires memory to store at least two input blocks internally. Aborting.");
1038  throw bad_parameter("basic_runs_merger::sort(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
1039  }
1040 
1041  merge_recursively(memory_to_use);
1042 
1043  nruns = sruns.runs.size();
1044  }
1045 
1046  assert(nruns + min_prefetch_buffers <= input_buffers);
1047 
1048  unsigned_type i;
1049  unsigned_type prefetch_seq_size = 0;
1050  for (i = 0; i < nruns; ++i)
1051  {
1052  prefetch_seq_size += sruns.runs[i].size();
1053  }
1054 
1055  consume_seq.resize(prefetch_seq_size);
1056  prefetch_seq = new int_type[prefetch_seq_size];
1057 
1058  typename run_type::iterator copy_start = consume_seq.begin();
1059  for (i = 0; i < nruns; ++i)
1060  {
1061  copy_start = std::copy(
1062  sruns.runs[i].begin(),
1063  sruns.runs[i].end(),
1064  copy_start);
1065  }
1066 
1067  std::stable_sort(consume_seq.begin(), consume_seq.end(),
1068  sort_helper::trigger_entry_cmp<trigger_entry_type, value_cmp>(cmp) _STXXL_SORT_TRIGGER_FORCE_SEQUENTIAL);
1069 
1070  const unsigned_type n_prefetch_buffers = STXXL_MAX(min_prefetch_buffers, input_buffers - nruns);
1071 
1072 #if STXXL_SORT_OPTIMAL_PREFETCHING
1073  // heuristic
1074  const int_type n_opt_prefetch_buffers = min_prefetch_buffers + (3 * (n_prefetch_buffers - min_prefetch_buffers)) / 10;
1075 
1076  compute_prefetch_schedule(
1077  consume_seq,
1078  prefetch_seq,
1079  n_opt_prefetch_buffers,
1080  disks_number);
1081 #else
1082  for (i = 0; i < prefetch_seq_size; ++i)
1083  prefetch_seq[i] = i;
1084 #endif //STXXL_SORT_OPTIMAL_PREFETCHING
1085 
1086  prefetcher = new prefetcher_type(
1087  consume_seq.begin(),
1088  consume_seq.end(),
1089  prefetch_seq,
1090  STXXL_MIN(nruns + n_prefetch_buffers, prefetch_seq_size));
1091 
1092  if (do_parallel_merge())
1093  {
1094 #if STXXL_PARALLEL_MULTIWAY_MERGE
1095 // begin of STL-style merging
1096  seqs = new std::vector<sequence>(nruns);
1097  buffers = new std::vector<block_type *>(nruns);
1098 
1099  for (unsigned_type i = 0; i < nruns; ++i) //initialize sequences
1100  {
1101  (*buffers)[i] = prefetcher->pull_block(); //get first block of each run
1102  (*seqs)[i] = std::make_pair((*buffers)[i]->begin(), (*buffers)[i]->end()); //this memory location stays the same, only the data is exchanged
1103  }
1104 // end of STL-style merging
1105 #else
1106  STXXL_THROW_UNREACHABLE();
1107 #endif //STXXL_PARALLEL_MULTIWAY_MERGE
1108  }
1109  else
1110  {
1111 // begin of native merging procedure
1112  losers = new loser_tree_type(prefetcher, nruns, run_cursor2_cmp_type(cmp));
1113 // end of native merging procedure
1114  }
1115 
1116  current_block = new out_block_type;
1117  fill_current_block();
1118 
1119  current_value = current_block->elem[0];
1120  buffer_pos = 1;
1121  }
1122 
1123  public:
1125  bool empty() const
1126  {
1127  return elements_remaining == 0;
1128  }
1129 
1131  const value_type & operator * () const
1132  {
1133  assert(!empty());
1134  return current_value;
1135  }
1136 
1138  const value_type * operator -> () const
1139  {
1140  return &(operator * ());
1141  }
1142 
1144  basic_runs_merger & operator ++ () // preincrement operator
1145  {
1146  assert(!empty());
1147 
1148  --elements_remaining;
1149 
1150  if (buffer_pos != out_block_type::size)
1151  {
1152  current_value = current_block->elem[buffer_pos];
1153  ++buffer_pos;
1154  }
1155  else
1156  {
1157  if (!empty())
1158  {
1159  fill_current_block();
1160 
1161 #if STXXL_CHECK_ORDER_IN_SORTS
1162  assert(stxxl::is_sorted(current_block->elem, current_block->elem + STXXL_MIN<size_type>(elements_remaining, current_block->size), cmp));
1163  assert(!cmp(current_block->elem[0], current_value));
1164 #endif //STXXL_CHECK_ORDER_IN_SORTS
1165  current_value = current_block->elem[0];
1166  buffer_pos = 1;
1167  }
1168  }
1169 
1170 #if STXXL_CHECK_ORDER_IN_SORTS
1171  if (!empty())
1172  {
1173  assert(!cmp(current_value, last_element));
1174  last_element = current_value;
1175  }
1176 #endif //STXXL_CHECK_ORDER_IN_SORTS
1177 
1178  return *this;
1179  }
1180 
1184  {
1185  deallocate_prefetcher();
1186  delete current_block;
1187  }
1188  };
1189 
1190 
1191  template <class RunsType_, class Cmp_, class AllocStr_>
1192  void basic_runs_merger<RunsType_, Cmp_, AllocStr_>::merge_recursively(unsigned_type memory_to_use)
1193  {
1194  block_manager * bm = block_manager::get_instance();
1195  unsigned_type ndisks = config::get_instance()->disks_number();
1196  unsigned_type nwrite_buffers = 2 * ndisks;
1197  unsigned_type memory_for_write_buffers = nwrite_buffers * sizeof(block_type);
1198 
1199  // memory consumption of the recursive merger (uses block_type as out_block_type)
1200  unsigned_type recursive_merger_memory_prefetch_buffers = 2 * ndisks * sizeof(block_type);
1201  unsigned_type recursive_merger_memory_out_block = sizeof(block_type);
1202  unsigned_type memory_for_buffers = memory_for_write_buffers
1203  + recursive_merger_memory_prefetch_buffers
1204  + recursive_merger_memory_out_block;
1205  // maximum arity in the recursive merger
1206  unsigned_type max_arity = (memory_to_use > memory_for_buffers ? memory_to_use - memory_for_buffers : 0) / block_type::raw_size;
1207 
1208  unsigned_type nruns = sruns.runs.size();
1209  const unsigned_type merge_factor = optimal_merge_factor(nruns, max_arity);
1210  assert(merge_factor > 1);
1211  assert(merge_factor <= max_arity);
1212  while (nruns > max_arity)
1213  {
1214  unsigned_type new_nruns = div_ceil(nruns, merge_factor);
1215  STXXL_VERBOSE("Starting new merge phase: nruns: " << nruns <<
1216  " opt_merge_factor: " << merge_factor << " max_arity: " << max_arity << " new_nruns: " << new_nruns);
1217 
1218  sorted_runs_type new_runs;
1219  new_runs.runs.resize(new_nruns);
1220  new_runs.runs_sizes.resize(new_nruns);
1221  new_runs.elements = sruns.elements;
1222 
1223  unsigned_type runs_left = nruns;
1224  unsigned_type cur_out_run = 0;
1225  unsigned_type elements_in_new_run = 0;
1226 
1227  while (runs_left > 0)
1228  {
1229  int_type runs2merge = STXXL_MIN(runs_left, merge_factor);
1230  elements_in_new_run = 0;
1231  for (unsigned_type i = nruns - runs_left; i < (nruns - runs_left + runs2merge); ++i)
1232  {
1233  elements_in_new_run += sruns.runs_sizes[i];
1234  }
1235  const unsigned_type blocks_in_new_run1 = div_ceil(elements_in_new_run, block_type::size);
1236 
1237  new_runs.runs_sizes[cur_out_run] = elements_in_new_run;
1238  // allocate run
1239  new_runs.runs[cur_out_run++].resize(blocks_in_new_run1);
1240  runs_left -= runs2merge;
1241  }
1242 
1243  // allocate blocks for the new runs
1244  for (unsigned_type i = 0; i < new_runs.runs.size(); ++i)
1245  bm->new_blocks(alloc_strategy(), make_bid_iterator(new_runs.runs[i].begin()), make_bid_iterator(new_runs.runs[i].end()));
1246 
1247  // merge all
1248  runs_left = nruns;
1249  cur_out_run = 0;
1250  size_type elements_left = sruns.elements;
1251 
1252  while (runs_left > 0)
1253  {
1254  unsigned_type runs2merge = STXXL_MIN(runs_left, merge_factor);
1255  STXXL_VERBOSE("Merging " << runs2merge << " runs");
1256 
1257  sorted_runs_type cur_runs;
1258  cur_runs.runs.resize(runs2merge);
1259  cur_runs.runs_sizes.resize(runs2merge);
1260 
1261  std::copy(sruns.runs.begin() + nruns - runs_left,
1262  sruns.runs.begin() + nruns - runs_left + runs2merge,
1263  cur_runs.runs.begin());
1264  std::copy(sruns.runs_sizes.begin() + nruns - runs_left,
1265  sruns.runs_sizes.begin() + nruns - runs_left + runs2merge,
1266  cur_runs.runs_sizes.begin());
1267 
1268  runs_left -= runs2merge;
1269  /*
1270  cur_runs.elements = (runs_left)?
1271  (new_runs.runs[cur_out_run].size()*block_type::size):
1272  (elements_left);
1273  */
1274  cur_runs.elements = new_runs.runs_sizes[cur_out_run];
1275  elements_left -= cur_runs.elements;
1276 
1277  if (runs2merge > 1)
1278  {
1279  basic_runs_merger<RunsType_, Cmp_, AllocStr_> merger(cur_runs, cmp, memory_to_use - memory_for_write_buffers);
1280 
1281  { // make sure everything is being destroyed in right time
1283  new_runs.runs[cur_out_run].begin(),
1284  nwrite_buffers);
1285 
1286  size_type cnt = 0;
1287  const size_type cnt_max = cur_runs.elements;
1288 
1289  while (cnt != cnt_max)
1290  {
1291  *out = *merger;
1292  if ((cnt % block_type::size) == 0) // have to write the trigger value
1293  new_runs.runs[cur_out_run][cnt / size_type(block_type::size)].value = *merger;
1294 
1295  ++cnt;
1296  ++out;
1297  ++merger;
1298  }
1299  assert(merger.empty());
1300 
1301  while (cnt % block_type::size)
1302  {
1303  *out = cmp.max_value();
1304  ++out;
1305  ++cnt;
1306  }
1307  }
1308  }
1309  else
1310  {
1311  bm->delete_blocks(
1312  make_bid_iterator(new_runs.runs.back().begin()),
1313  make_bid_iterator(new_runs.runs.back().end()));
1314 
1315  assert(cur_runs.runs.size() == 1);
1316 
1317  std::copy(cur_runs.runs.front().begin(),
1318  cur_runs.runs.front().end(),
1319  new_runs.runs.back().begin());
1320  new_runs.runs_sizes.back() = cur_runs.runs_sizes.front();
1321  }
1322 
1323  ++cur_out_run;
1324  }
1325  assert(elements_left == 0);
1326 
1327  nruns = new_nruns;
1328  sruns = new_runs;
1329  }
1330  }
1331 
1332 
1339  template <class RunsType_,
1340  class Cmp_,
1341  class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
1342  class runs_merger : public basic_runs_merger<RunsType_, Cmp_, AllocStr_>
1343  {
1344  private:
1345  typedef RunsType_ sorted_runs_type;
1347  typedef typename base::value_cmp value_cmp;
1348  typedef typename base::block_type block_type;
1349 
1350  public:
1355  runs_merger(const sorted_runs_type & r, value_cmp c, unsigned_type memory_to_use) :
1356  base(r, c, memory_to_use)
1357  { }
1358  };
1359 
1360 
1362  // SORT //
1364 
1372  template <class Input_,
1373  class Cmp_,
1374  unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(typename Input_::value_type),
1375  class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY,
1376  class runs_creator_type = runs_creator<Input_, Cmp_, BlockSize_, AllocStr_> >
1377  class sort : public noncopyable
1378  {
1379  typedef typename runs_creator_type::sorted_runs_type sorted_runs_type;
1381 
1382  runs_creator_type creator;
1383  runs_merger_type merger;
1384 
1385  public:
1387  typedef typename Input_::value_type value_type;
1388 
1393  sort(Input_ & in, Cmp_ c, unsigned_type memory_to_use) :
1394  creator(in, c, memory_to_use),
1395  merger(creator.result(), c, memory_to_use)
1396  {
1397  sort_helper::verify_sentinel_strict_weak_ordering(c);
1398  }
1399 
1405  sort(Input_ & in, Cmp_ c, unsigned_type memory_to_use_rc, unsigned_type memory_to_use_m) :
1406  creator(in, c, memory_to_use_rc),
1407  merger(creator.result(), c, memory_to_use_m)
1408  {
1409  sort_helper::verify_sentinel_strict_weak_ordering(c);
1410  }
1411 
1412 
1414  bool empty() const
1415  {
1416  return merger.empty();
1417  }
1418 
1420  const value_type & operator * () const
1421  {
1422  assert(!empty());
1423  return *merger;
1424  }
1425 
1426  const value_type * operator -> () const
1427  {
1428  assert(!empty());
1429  return merger.operator -> ();
1430  }
1431 
1434  {
1435  ++merger;
1436  return *this;
1437  }
1438  };
1439 
1444  template <
1445  class ValueType_,
1446  unsigned BlockSize_>
1448  {
1449  typedef ValueType_ value_type;
1450  typedef BID<BlockSize_> bid_type;
1451  typedef sort_helper::trigger_entry<bid_type, value_type> trigger_entry_type;
1452 
1453  public:
1455  };
1456 
1458 }
1459 
1462 
1464 
1473 template <unsigned BlockSize,
1474  class RandomAccessIterator,
1475  class CmpType,
1476  class AllocStr>
1477 void sort(RandomAccessIterator begin,
1478  RandomAccessIterator end,
1479  CmpType cmp,
1480  unsigned_type MemSize,
1481  AllocStr AS)
1482 {
1483  STXXL_UNUSED(AS);
1484 #ifdef BOOST_MSVC
1485  typedef typename streamify_traits<RandomAccessIterator>::stream_type InputType;
1486 #else
1487  typedef __typeof__(stream::streamify(begin, end)) InputType;
1488 #endif //BOOST_MSVC
1489  InputType Input(begin, end);
1491  sorter_type Sort(Input, cmp, MemSize);
1492  stream::materialize(Sort, begin);
1493 }
1494 
1496 
1497 __STXXL_END_NAMESPACE
1498 
1499 #endif // !STXXL_SORT_STREAM_HEADER
1500 // vim: et:ts=4:sw=4
runs_merger(const sorted_runs_type &r, value_cmp c, unsigned_type memory_to_use)
Creates a runs merger object.
Definition: sort_stream.h:1355
runs_creator(Cmp_ c, unsigned_type memory_to_use)
Creates the object.
Definition: sort_stream.h:655
Forms sorted runs of data from a stream.
Definition: sort_stream.h:55
Encapsulates asynchronous buffered block writing engine.
Definition: buf_writer.h:37
runs_creator(Cmp_ c, unsigned_type memory_to_use)
Creates the object.
Definition: sort_stream.h:504
void sort(ExtIterator_ first, ExtIterator_ last, StrictWeakOrdering_ cmp, unsigned_type M)
Sort records comparison-based.
Definition: sort.h:700
sort(Input_ &in, Cmp_ c, unsigned_type memory_to_use)
Creates the object.
Definition: sort_stream.h:1393
Block containing elements of fixed length.
Definition: typed_block.h:223
Produces sorted stream from input stream.
Definition: sort_stream.h:1377
number of elements in block
Definition: typed_block.h:240
bool empty() const
Standard stream method.
Definition: sort_stream.h:1125
sorted_runs_type::value_type value_type
Standard stream typedef.
Definition: sort_stream.h:844
iterator2stream< InputIterator_ > streamify(InputIterator_ begin, InputIterator_ end)
Input iterator range to stream converter.
Definition: stream.h:115
basic_runs_merger & operator++()
Standard stream method.
Definition: sort_stream.h:1144
Input strategy for runs_creator class.
Definition: sort_stream.h:600
virtual ~basic_runs_merger()
Destructor.
Definition: sort_stream.h:1183
Input_::value_type value_type
Standard stream typedef.
Definition: sort_stream.h:1387
block_type * pull_block()
Pulls next unconsumed block from the consumption sequence.
Definition: block_prefetcher.h:149
iterator begin()
Returns iterator pointing to the first element.
Definition: typed_block.h:81
Merges sorted runs.
Definition: sort_stream.h:1342
void delete_blocks(const BIDIteratorClass &bidbegin, const BIDIteratorClass &bidend)
Deallocates blocks.
Definition: mng.h:218
Merges sorted runs.
Definition: sort_stream.h:823
basic_runs_merger(const sorted_runs_type &r, value_cmp c, unsigned_type memory_to_use)
Creates a runs merger object.
Definition: sort_stream.h:964
const sorted_runs_type & result()
Returns the sorted runs object.
Definition: sort_stream.h:749
void push(const value_type &val)
Adds new element to the current run.
Definition: sort_stream.h:673
bool check_sorted_runs(const RunsType_ &sruns, Cmp_ cmp)
Checker for the sorted runs object created by the runs_creator .
Definition: sort_stream.h:764
void new_blocks(const DiskAssignFunctor &functor, BIDIteratorClass bidbegin, BIDIteratorClass bidend, unsigned_type offset=0)
Allocates new blocks.
Definition: mng.h:90
Implemented as reference counting smart pointer.
Definition: request_ptr.h:34
Encapsulates asynchronous prefetching engine.
Definition: block_prefetcher.h:54
Buffered output stream.
Definition: buf_ostream.h:30
Computes sorted runs type from value type and block size.
Definition: sort_stream.h:1447
iterator end()
Returns iterator pointing to the end element.
Definition: typed_block.h:99
const sorted_runs_type & result()
Returns the sorted runs object.
Definition: sort_stream.h:574
void wait_all(request_iterator_ reqs_begin, request_iterator_ reqs_end)
Collection of functions to track statuses of a number of requests.
Definition: request_operations.h:36
OutputIterator_ materialize(StreamAlgorithm_ &in, OutputIterator_ out)
Stores consecutively stream content to an output iterator.
Definition: stream.h:392
void finish()
Finishes current run and begins new one.
Definition: sort_stream.h:706
const sorted_runs_type & result()
Returns the sorted runs object.
Definition: sort_stream.h:132
basic_runs_creator(Input_ &i, Cmp_ c, unsigned_type memory_to_use)
Create the object.
Definition: sort_stream.h:119
virtual void wait(bool measure_time=true)=0
Suspends calling thread until completion of the request.
sort(Input_ &in, Cmp_ c, unsigned_type memory_to_use_rc, unsigned_type memory_to_use_m)
Creates the object.
Definition: sort_stream.h:1405
const value_type * operator->() const
Standard stream method.
Definition: sort_stream.h:1138
void push(const value_type &val)
Adds new element to the sorter.
Definition: sort_stream.h:528
const value_type & operator*() const
Standard stream method.
Definition: sort_stream.h:1131
Block manager class.
Definition: mng.h:59
const value_type & operator*() const
Standard stream method.
Definition: sort_stream.h:1420
bool empty() const
Standard stream method.
Definition: sort_stream.h:1414
runs_creator(Input_ &i, Cmp_ c, unsigned_type memory_to_use)
Creates the object.
Definition: sort_stream.h:360
Forms sorted runs of data from a stream.
Definition: sort_stream.h:344
Input strategy for runs_creator class.
Definition: sort_stream.h:373
sort & operator++()
Standard stream method.
Definition: sort_stream.h:1433
Block identifier class.
Definition: bid.h:40