Stxxl  1.3.2
sort.h
1 /***************************************************************************
2  * include/stxxl/bits/algo/sort.h
3  *
4  * Part of the STXXL. See http://stxxl.sourceforge.net
5  *
6  * Copyright (C) 2002-2003 Roman Dementiev <[email protected]>
7  * Copyright (C) 2006 Johannes Singler <[email protected]>
8  * Copyright (C) 2008-2011 Andreas Beckmann <[email protected]>
9  *
10  * Distributed under the Boost Software License, Version 1.0.
11  * (See accompanying file LICENSE_1_0.txt or copy at
12  * http://www.boost.org/LICENSE_1_0.txt)
13  **************************************************************************/
14 
15 #ifndef STXXL_SORT_HEADER
16 #define STXXL_SORT_HEADER
17 
18 #include <functional>
19 
20 #include <stxxl/bits/mng/mng.h>
21 #include <stxxl/bits/common/rand.h>
22 #include <stxxl/bits/mng/adaptor.h>
23 #include <stxxl/bits/common/simple_vector.h>
25 #include <stxxl/bits/mng/block_alloc_interleaved.h>
26 #include <stxxl/bits/io/request_operations.h>
27 #include <stxxl/bits/algo/sort_base.h>
28 #include <stxxl/bits/algo/sort_helper.h>
29 #include <stxxl/bits/algo/intksort.h>
30 #include <stxxl/bits/algo/adaptor.h>
31 #include <stxxl/bits/algo/async_schedule.h>
32 #include <stxxl/bits/mng/block_prefetcher.h>
33 #include <stxxl/bits/mng/buf_writer.h>
34 #include <stxxl/bits/algo/run_cursor.h>
35 #include <stxxl/bits/algo/losertree.h>
36 #include <stxxl/bits/algo/inmemsort.h>
37 #include <stxxl/bits/parallel.h>
38 #include <stxxl/bits/common/is_sorted.h>
39 
40 
41 __STXXL_BEGIN_NAMESPACE
42 
45 
46 
49 namespace sort_local
50 {
51  template <typename block_type, typename bid_type>
52  struct read_next_after_write_completed
53  {
54  block_type * block;
55  bid_type bid;
56  request_ptr * req;
57  void operator () (request * /*completed_req*/)
58  {
59  *req = block->read(bid);
60  }
61  };
62 
63 
64  template <
65  typename block_type,
66  typename run_type,
67  typename input_bid_iterator,
68  typename value_cmp>
69  void
70  create_runs(
71  input_bid_iterator it,
72  run_type ** runs,
73  int_type nruns,
74  int_type _m,
75  value_cmp cmp)
76  {
77  typedef typename block_type::value_type type;
78  typedef typename block_type::bid_type bid_type;
79  STXXL_VERBOSE1("stxxl::create_runs nruns=" << nruns << " m=" << _m);
80 
81  int_type m2 = _m / 2;
82  block_manager * bm = block_manager::get_instance();
83  block_type * Blocks1 = new block_type[m2];
84  block_type * Blocks2 = new block_type[m2];
85  bid_type * bids1 = new bid_type[m2];
86  bid_type * bids2 = new bid_type[m2];
87  request_ptr * read_reqs1 = new request_ptr[m2];
88  request_ptr * read_reqs2 = new request_ptr[m2];
89  request_ptr * write_reqs = new request_ptr[m2];
90  read_next_after_write_completed<block_type, bid_type> * next_run_reads =
91  new read_next_after_write_completed<block_type, bid_type>[m2];
92 
93  disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
94 
95  int_type i;
96  int_type run_size = 0;
97 
98  assert(nruns >= 2);
99 
100  run_size = runs[0]->size();
101  assert(run_size == m2);
102 
103  for (i = 0; i < run_size; ++i)
104  {
105  STXXL_VERBOSE1("stxxl::create_runs posting read " << long(Blocks1[i].elem));
106  bids1[i] = *(it++);
107  read_reqs1[i] = Blocks1[i].read(bids1[i]);
108  }
109 
110  run_size = runs[1]->size();
111 
112  for (i = 0; i < run_size; ++i)
113  {
114  STXXL_VERBOSE1("stxxl::create_runs posting read " << long(Blocks2[i].elem));
115  bids2[i] = *(it++);
116  read_reqs2[i] = Blocks2[i].read(bids2[i]);
117  }
118 
119  for (int_type k = 0; k < nruns - 1; ++k)
120  {
121  run_type * run = runs[k];
122  run_size = run->size();
123  assert(run_size == m2);
124  #ifndef NDEBUG
125  int_type next_run_size = runs[k + 1]->size();
126  #endif
127  assert((next_run_size == m2) || (next_run_size <= m2 && k == nruns - 2));
128 
129  STXXL_VERBOSE1("stxxl::create_runs start waiting read_reqs1");
130  wait_all(read_reqs1, run_size);
131  STXXL_VERBOSE1("stxxl::create_runs finish waiting read_reqs1");
132  for (i = 0; i < run_size; ++i)
133  bm->delete_block(bids1[i]);
134 
136  sort(make_element_iterator(Blocks1, 0),
137  make_element_iterator(Blocks1, run_size * block_type::size),
138  cmp);
139 
140  STXXL_VERBOSE1("stxxl::create_runs start waiting write_reqs");
141  if (k > 0)
142  wait_all(write_reqs, m2);
143  STXXL_VERBOSE1("stxxl::create_runs finish waiting write_reqs");
144 
145  int_type runplus2size = (k < nruns - 2) ? runs[k + 2]->size() : 0;
146  for (i = 0; i < m2; ++i)
147  {
148  STXXL_VERBOSE1("stxxl::create_runs posting write " << long(Blocks1[i].elem));
149  (*run)[i].value = Blocks1[i][0];
150  if (i >= runplus2size) {
151  write_reqs[i] = Blocks1[i].write((*run)[i].bid);
152  }
153  else
154  {
155  next_run_reads[i].block = Blocks1 + i;
156  next_run_reads[i].req = read_reqs1 + i;
157  bids1[i] = next_run_reads[i].bid = *(it++);
158  write_reqs[i] = Blocks1[i].write((*run)[i].bid, next_run_reads[i]);
159  }
160  }
161  std::swap(Blocks1, Blocks2);
162  std::swap(bids1, bids2);
163  std::swap(read_reqs1, read_reqs2);
164  }
165 
166  run_type * run = runs[nruns - 1];
167  run_size = run->size();
168  STXXL_VERBOSE1("stxxl::create_runs start waiting read_reqs1");
169  wait_all(read_reqs1, run_size);
170  STXXL_VERBOSE1("stxxl::create_runs finish waiting read_reqs1");
171  for (i = 0; i < run_size; ++i)
172  bm->delete_block(bids1[i]);
173 
175  sort(make_element_iterator(Blocks1, 0),
176  make_element_iterator(Blocks1, run_size * block_type::size),
177  cmp);
178 
179  STXXL_VERBOSE1("stxxl::create_runs start waiting write_reqs");
180  wait_all(write_reqs, m2);
181  STXXL_VERBOSE1("stxxl::create_runs finish waiting write_reqs");
182 
183  for (i = 0; i < run_size; ++i)
184  {
185  STXXL_VERBOSE1("stxxl::create_runs posting write " << long(Blocks1[i].elem));
186  (*run)[i].value = Blocks1[i][0];
187  write_reqs[i] = Blocks1[i].write((*run)[i].bid);
188  }
189 
190  STXXL_VERBOSE1("stxxl::create_runs start waiting write_reqs");
191  wait_all(write_reqs, run_size);
192  STXXL_VERBOSE1("stxxl::create_runs finish waiting write_reqs");
193 
194  delete[] Blocks1;
195  delete[] Blocks2;
196  delete[] bids1;
197  delete[] bids2;
198  delete[] read_reqs1;
199  delete[] read_reqs2;
200  delete[] write_reqs;
201  delete[] next_run_reads;
202  }
203 
204 
205  template <typename block_type, typename run_type, typename value_cmp>
206  bool check_sorted_runs(run_type ** runs,
207  unsigned_type nruns,
208  unsigned_type m,
209  value_cmp cmp)
210  {
211  typedef typename block_type::value_type value_type;
212 
213  STXXL_MSG("check_sorted_runs Runs: " << nruns);
214  unsigned_type irun = 0;
215  for (irun = 0; irun < nruns; ++irun)
216  {
217  const unsigned_type nblocks_per_run = runs[irun]->size();
218  unsigned_type blocks_left = nblocks_per_run;
219  block_type * blocks = new block_type[m];
220  request_ptr * reqs = new request_ptr[m];
221  value_type last = cmp.min_value();
222 
223  for (unsigned_type off = 0; off < nblocks_per_run; off += m)
224  {
225  const unsigned_type nblocks = STXXL_MIN(blocks_left, m);
226  const unsigned_type nelements = nblocks * block_type::size;
227  blocks_left -= nblocks;
228 
229  for (unsigned_type j = 0; j < nblocks; ++j)
230  {
231  reqs[j] = blocks[j].read((*runs[irun])[j + off].bid);
232  }
233  wait_all(reqs, reqs + nblocks);
234 
235  if (off && cmp(blocks[0][0], last))
236  {
237  STXXL_MSG("check_sorted_runs wrong first value in the run " << irun);
238  STXXL_MSG(" first value: " << blocks[0][0]);
239  STXXL_MSG(" last value: " << last);
240  for (unsigned_type k = 0; k < block_type::size; ++k)
241  STXXL_MSG("Element " << k << " in the block is :" << blocks[0][k]);
242 
243  return false;
244  }
245 
246  for (unsigned_type j = 0; j < nblocks; ++j)
247  {
248  if (!(blocks[j][0] == (*runs[irun])[j + off].value))
249  {
250  STXXL_MSG("check_sorted_runs wrong trigger in the run " << irun << " block " << (j + off));
251  STXXL_MSG(" trigger value: " << (*runs[irun])[j + off].value);
252  STXXL_MSG("Data in the block:");
253  for (unsigned_type k = 0; k < block_type::size; ++k)
254  STXXL_MSG("Element " << k << " in the block is :" << blocks[j][k]);
255 
256  STXXL_MSG("BIDS:");
257  for (unsigned_type k = 0; k < nblocks; ++k)
258  {
259  if (k == j)
260  STXXL_MSG("Bad one comes next.");
261  STXXL_MSG("BID " << (k + off) << " is: " << ((*runs[irun])[k + off].bid));
262  }
263 
264  return false;
265  }
266  }
267  if (!stxxl::is_sorted(make_element_iterator(blocks, 0),
268  make_element_iterator(blocks, nelements),
269  cmp))
270  {
271  STXXL_MSG("check_sorted_runs wrong order in the run " << irun);
272  STXXL_MSG("Data in blocks:");
273  for (unsigned_type j = 0; j < nblocks; ++j)
274  {
275  for (unsigned_type k = 0; k < block_type::size; ++k)
276  STXXL_MSG(" Element " << k << " in block " << (j + off) << " is :" << blocks[j][k]);
277  }
278  STXXL_MSG("BIDS:");
279  for (unsigned_type k = 0; k < nblocks; ++k)
280  {
281  STXXL_MSG("BID " << (k + off) << " is: " << ((*runs[irun])[k + off].bid));
282  }
283 
284  return false;
285  }
286 
287  last = blocks[nblocks - 1][block_type::size - 1];
288  }
289 
290  assert(blocks_left == 0);
291  delete[] reqs;
292  delete[] blocks;
293  }
294 
295  return true;
296  }
297 
298 
299  template <typename block_type, typename run_type, typename value_cmp>
300  void merge_runs(run_type ** in_runs, int_type nruns, run_type * out_run, unsigned_type _m, value_cmp cmp)
301  {
302  typedef typename block_type::value_type value_type;
303  typedef typename run_type::value_type trigger_entry_type;
305  typedef run_cursor2<block_type, prefetcher_type> run_cursor_type;
306  typedef sort_helper::run_cursor2_cmp<block_type, prefetcher_type, value_cmp> run_cursor2_cmp_type;
307 
308  run_type consume_seq(out_run->size());
309 
310  int_type * prefetch_seq = new int_type[out_run->size()];
311 
312  typename run_type::iterator copy_start = consume_seq.begin();
313  for (int_type i = 0; i < nruns; i++)
314  {
315  // TODO: try to avoid copy
316  copy_start = std::copy(
317  in_runs[i]->begin(),
318  in_runs[i]->end(),
319  copy_start);
320  }
321 
322  std::stable_sort(consume_seq.begin(), consume_seq.end(),
323  sort_helper::trigger_entry_cmp<trigger_entry_type, value_cmp>(cmp) _STXXL_SORT_TRIGGER_FORCE_SEQUENTIAL);
324 
325  int_type disks_number = config::get_instance()->disks_number();
326 
327 #ifdef PLAY_WITH_OPT_PREF
328  const int_type n_write_buffers = 4 * disks_number;
329 #else
330  const int_type n_prefetch_buffers = STXXL_MAX(2 * disks_number, (3 * (int_type(_m) - nruns) / 4));
331  const int_type n_write_buffers = STXXL_MAX(2 * disks_number, int_type(_m) - nruns - n_prefetch_buffers);
332  #if STXXL_SORT_OPTIMAL_PREFETCHING
333  // heuristic
334  const int_type n_opt_prefetch_buffers = 2 * disks_number + (3 * (n_prefetch_buffers - 2 * disks_number)) / 10;
335  #endif
336 #endif
337 
338 #if STXXL_SORT_OPTIMAL_PREFETCHING
339  compute_prefetch_schedule(
340  consume_seq,
341  prefetch_seq,
342  n_opt_prefetch_buffers,
343  disks_number);
344 #else
345  for (unsigned_type i = 0; i < out_run->size(); i++)
346  prefetch_seq[i] = i;
347 
348 #endif
349 
350  prefetcher_type prefetcher(consume_seq.begin(),
351  consume_seq.end(),
352  prefetch_seq,
353  nruns + n_prefetch_buffers);
354 
355  buffered_writer<block_type> writer(n_write_buffers, n_write_buffers / 2);
356 
357  int_type out_run_size = out_run->size();
358 
359  block_type * out_buffer = writer.get_free_block();
360 
361 //If parallelism is activated, one can still fall back to the
362 //native merge routine by setting stxxl::SETTINGS::native_merge= true, //otherwise, it is used anyway.
363 
364  if (do_parallel_merge())
365  {
366 #if STXXL_PARALLEL_MULTIWAY_MERGE
367 
368 // begin of STL-style merging
369 
370  typedef stxxl::int64 diff_type;
371  typedef std::pair<typename block_type::iterator, typename block_type::iterator> sequence;
372  typedef typename std::vector<sequence>::size_type seqs_size_type;
373  std::vector<sequence> seqs(nruns);
374  std::vector<block_type *> buffers(nruns);
375 
376  for (int_type i = 0; i < nruns; i++) // initialize sequences
377  {
378  buffers[i] = prefetcher.pull_block(); // get first block of each run
379  seqs[i] = std::make_pair(buffers[i]->begin(), buffers[i]->end());
380  // this memory location stays the same, only the data is exchanged
381  }
382 
383  #if STXXL_CHECK_ORDER_IN_SORTS
384  value_type last_elem = cmp.min_value();
385  #endif
386  diff_type num_currently_mergeable = 0;
387 
388  for (int_type j = 0; j < out_run_size; ++j) // for the whole output run, out_run_size is in blocks
389  {
390  diff_type rest = block_type::size; // elements still to merge for this output block
391 
392  STXXL_VERBOSE1("output block " << j);
393  do {
394  if (num_currently_mergeable < rest)
395  {
396  if (prefetcher.empty())
397  {
398  // anything remaining is already in memory
399  num_currently_mergeable = (out_run_size - j) * block_type::size
400  - (block_type::size - rest);
401  }
402  else
403  {
404  num_currently_mergeable = sort_helper::count_elements_less_equal(
405  seqs, consume_seq[prefetcher.pos()].value, cmp);
406  }
407  }
408 
409  diff_type output_size = STXXL_MIN(num_currently_mergeable, rest); // at most rest elements
410 
411  STXXL_VERBOSE1("before merge " << output_size);
412 
413  stxxl::parallel::multiway_merge(seqs.begin(), seqs.end(), out_buffer->end() - rest, cmp, output_size);
414  // sequence iterators are progressed appropriately
415 
416  rest -= output_size;
417  num_currently_mergeable -= output_size;
418 
419  STXXL_VERBOSE1("after merge");
420 
421  sort_helper::refill_or_remove_empty_sequences(seqs, buffers, prefetcher);
422  } while (rest > 0 && seqs.size() > 0);
423 
424  #if STXXL_CHECK_ORDER_IN_SORTS
425  if (!stxxl::is_sorted(out_buffer->begin(), out_buffer->end(), cmp))
426  {
427  for (value_type * i = out_buffer->begin() + 1; i != out_buffer->end(); i++)
428  if (cmp(*i, *(i - 1)))
429  {
430  STXXL_VERBOSE1("Error at position " << (i - out_buffer->begin()));
431  }
432  assert(false);
433  }
434 
435  if (j > 0) // do not check in first iteration
436  assert(cmp((*out_buffer)[0], last_elem) == false);
437 
438  last_elem = (*out_buffer)[block_type::size - 1];
439  #endif
440 
441  (*out_run)[j].value = (*out_buffer)[0]; // save smallest value
442 
443  out_buffer = writer.write(out_buffer, (*out_run)[j].bid);
444  }
445 
446 // end of STL-style merging
447 
448 #else
449  STXXL_THROW_UNREACHABLE();
450 #endif
451  }
452  else
453  {
454 // begin of native merging procedure
455 
456  loser_tree<run_cursor_type, run_cursor2_cmp_type>
457  losers(&prefetcher, nruns, run_cursor2_cmp_type(cmp));
458 
459 #if STXXL_CHECK_ORDER_IN_SORTS
460  value_type last_elem = cmp.min_value();
461 #endif
462 
463  for (int_type i = 0; i < out_run_size; ++i)
464  {
465  losers.multi_merge(out_buffer->elem, out_buffer->elem + block_type::size);
466  (*out_run)[i].value = *(out_buffer->elem);
467 
468 #if STXXL_CHECK_ORDER_IN_SORTS
469  assert(stxxl::is_sorted(
470  out_buffer->begin(),
471  out_buffer->end(),
472  cmp));
473 
474  if (i)
475  assert(cmp(*(out_buffer->elem), last_elem) == false);
476 
477  last_elem = (*out_buffer).elem[block_type::size - 1];
478 #endif
479 
480  out_buffer = writer.write(out_buffer, (*out_run)[i].bid);
481  }
482 
483 // end of native merging procedure
484  }
485 
486  delete[] prefetch_seq;
487 
488  block_manager * bm = block_manager::get_instance();
489  for (int_type i = 0; i < nruns; ++i)
490  {
491  unsigned_type sz = in_runs[i]->size();
492  for (unsigned_type j = 0; j < sz; ++j)
493  bm->delete_block((*in_runs[i])[j].bid);
494 
495 
496  delete in_runs[i];
497  }
498  }
499 
500 
501  template <typename block_type,
502  typename alloc_strategy,
503  typename input_bid_iterator,
504  typename value_cmp>
505  simple_vector<sort_helper::trigger_entry<block_type> > *
506  sort_blocks(input_bid_iterator input_bids,
507  unsigned_type _n,
508  unsigned_type _m,
509  value_cmp cmp
510  )
511  {
512  typedef typename block_type::value_type type;
513  typedef typename block_type::bid_type bid_type;
514  typedef sort_helper::trigger_entry<block_type> trigger_entry_type;
515  typedef simple_vector<trigger_entry_type> run_type;
516  typedef typename interleaved_alloc_traits<alloc_strategy>::strategy interleaved_alloc_strategy;
517 
518  unsigned_type m2 = _m / 2;
519  unsigned_type full_runs = _n / m2;
520  unsigned_type partial_runs = ((_n % m2) ? 1 : 0);
521  unsigned_type nruns = full_runs + partial_runs;
522  unsigned_type i;
523 
524  block_manager * mng = block_manager::get_instance();
525 
526  //STXXL_VERBOSE ("n=" << _n << " nruns=" << nruns << "=" << full_runs << "+" << partial_runs);
527 
528  double begin = timestamp(), after_runs_creation, end;
529 
530  run_type ** runs = new run_type *[nruns];
531 
532  for (i = 0; i < full_runs; i++)
533  runs[i] = new run_type(m2);
534 
535 
536  if (partial_runs)
537  runs[i] = new run_type(_n - full_runs * m2);
538 
539  for (i = 0; i < nruns; ++i)
540  mng->new_blocks(alloc_strategy(), make_bid_iterator(runs[i]->begin()), make_bid_iterator(runs[i]->end()));
541 
542  sort_local::create_runs<block_type,
543  run_type,
544  input_bid_iterator,
545  value_cmp>(input_bids, runs, nruns, _m, cmp);
546 
547  after_runs_creation = timestamp();
548 
549  double io_wait_after_rf = stats::get_instance()->get_io_wait_time();
550 
551  disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
552 
553  const int_type merge_factor = optimal_merge_factor(nruns, _m);
554  run_type ** new_runs;
555 
556  while (nruns > 1)
557  {
558  int_type new_nruns = div_ceil(nruns, merge_factor);
559  STXXL_VERBOSE("Starting new merge phase: nruns: " << nruns <<
560  " opt_merge_factor: " << merge_factor << " m:" << _m << " new_nruns: " << new_nruns);
561 
562  new_runs = new run_type *[new_nruns];
563 
564  int_type runs_left = nruns;
565  int_type cur_out_run = 0;
566  int_type blocks_in_new_run = 0;
567 
568  while (runs_left > 0)
569  {
570  int_type runs2merge = STXXL_MIN(runs_left, merge_factor);
571  blocks_in_new_run = 0;
572  for (unsigned_type i = nruns - runs_left; i < (nruns - runs_left + runs2merge); i++)
573  blocks_in_new_run += runs[i]->size();
574 
575  // allocate run
576  new_runs[cur_out_run++] = new run_type(blocks_in_new_run);
577  runs_left -= runs2merge;
578  }
579  // allocate blocks for the new runs
580  if (cur_out_run == 1 && blocks_in_new_run == int_type(_n) && !input_bids->is_managed())
581  {
582  // if we sort a file we can reuse the input bids for the output
583  input_bid_iterator cur = input_bids;
584  for (int_type i = 0; cur != (input_bids + _n); ++cur)
585  {
586  (*new_runs[0])[i++].bid = *cur;
587  }
588 
589  bid_type & firstBID = (*new_runs[0])[0].bid;
590  if (firstBID.is_managed())
591  {
592  // the first block does not belong to the file
593  // need to reallocate it
594  mng->new_block(FR(), firstBID);
595  }
596  bid_type & lastBID = (*new_runs[0])[_n - 1].bid;
597  if (lastBID.is_managed())
598  {
599  // the first block does not belong to the file
600  // need to reallocate it
601  mng->new_block(FR(), lastBID);
602  }
603  }
604  else
605  {
606  mng->new_blocks(interleaved_alloc_strategy(new_nruns, alloc_strategy()),
607  RunsToBIDArrayAdaptor2<block_type::raw_size, run_type>(new_runs, 0, new_nruns, blocks_in_new_run),
608  RunsToBIDArrayAdaptor2<block_type::raw_size, run_type>(new_runs, _n, new_nruns, blocks_in_new_run));
609  }
610  // merge all
611  runs_left = nruns;
612  cur_out_run = 0;
613  while (runs_left > 0)
614  {
615  int_type runs2merge = STXXL_MIN(runs_left, merge_factor);
616 #if STXXL_CHECK_ORDER_IN_SORTS
617  assert((check_sorted_runs<block_type, run_type, value_cmp>(runs + nruns - runs_left, runs2merge, m2, cmp)));
618 #endif
619  STXXL_VERBOSE("Merging " << runs2merge << " runs");
620  merge_runs<block_type, run_type>(runs + nruns - runs_left,
621  runs2merge, *(new_runs + (cur_out_run++)), _m, cmp
622  );
623  runs_left -= runs2merge;
624  }
625 
626  nruns = new_nruns;
627  delete[] runs;
628  runs = new_runs;
629  }
630 
631  run_type * result = *runs;
632  delete[] runs;
633 
634  end = timestamp();
635 
636  STXXL_VERBOSE("Elapsed time : " << end - begin << " s. Run creation time: " <<
637  after_runs_creation - begin << " s");
638  STXXL_VERBOSE("Time in I/O wait(rf): " << io_wait_after_rf << " s");
639  STXXL_VERBOSE(*stats::get_instance());
640  STXXL_UNUSED(begin + after_runs_creation + end + io_wait_after_rf);
641 
642  return result;
643  }
644 }
645 
646 
692 template <typename ExtIterator_, typename StrictWeakOrdering_>
700 void sort(ExtIterator_ first, ExtIterator_ last, StrictWeakOrdering_ cmp, unsigned_type M)
701 {
702  sort_helper::verify_sentinel_strict_weak_ordering(cmp);
703 
704  typedef simple_vector<sort_helper::trigger_entry<typename ExtIterator_::block_type> > run_type;
705 
706  typedef typename ExtIterator_::vector_type::value_type value_type;
707  typedef typename ExtIterator_::block_type block_type;
708 
709  unsigned_type n = 0;
710 
711  block_manager * mng = block_manager::get_instance();
712 
713  first.flush();
714 
715  if ((last - first) * sizeof(value_type) * sort_memory_usage_factor() < M)
716  {
717  stl_in_memory_sort(first, last, cmp);
718  }
719  else
720  {
721  if (!(2 * block_type::raw_size * sort_memory_usage_factor() <= M)) {
722  throw bad_parameter("stxxl::sort(): INSUFFICIENT MEMORY provided, please increase parameter 'M'");
723  }
724 
725  if (first.block_offset())
726  {
727  if (last.block_offset()) // first and last element are
728  // not the first elements of their block
729  {
730  typename ExtIterator_::block_type * first_block = new typename ExtIterator_::block_type;
731  typename ExtIterator_::block_type * last_block = new typename ExtIterator_::block_type;
732  typename ExtIterator_::bid_type first_bid, last_bid;
733  request_ptr req;
734 
735  req = first_block->read(*first.bid());
736  mng->new_block(FR(), first_bid); // try to overlap
737  mng->new_block(FR(), last_bid);
738  req->wait();
739 
740 
741  req = last_block->read(*last.bid());
742 
743  unsigned_type i = 0;
744  for ( ; i < first.block_offset(); ++i)
745  {
746  first_block->elem[i] = cmp.min_value();
747  }
748 
749  req->wait();
750 
751 
752  req = first_block->write(first_bid);
753  for (i = last.block_offset(); i < block_type::size; ++i)
754  {
755  last_block->elem[i] = cmp.max_value();
756  }
757 
758  req->wait();
759 
760 
761  req = last_block->write(last_bid);
762 
763  n = last.bid() - first.bid() + 1;
764 
765  std::swap(first_bid, *first.bid());
766  std::swap(last_bid, *last.bid());
767 
768  req->wait();
769 
770 
771  delete first_block;
772  delete last_block;
773 
774  run_type * out =
775  sort_local::sort_blocks<
776  typename ExtIterator_::block_type,
777  typename ExtIterator_::vector_type::alloc_strategy_type,
778  typename ExtIterator_::bids_container_iterator>
779  (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp);
780 
781 
782  first_block = new typename ExtIterator_::block_type;
783  last_block = new typename ExtIterator_::block_type;
784  typename ExtIterator_::block_type * sorted_first_block = new typename ExtIterator_::block_type;
785  typename ExtIterator_::block_type * sorted_last_block = new typename ExtIterator_::block_type;
786  request_ptr * reqs = new request_ptr[2];
787 
788  reqs[0] = first_block->read(first_bid);
789  reqs[1] = sorted_first_block->read((*(out->begin())).bid);
790 
791  reqs[0]->wait();
792  reqs[1]->wait();
793 
794  reqs[0] = last_block->read(last_bid);
795  reqs[1] = sorted_last_block->read(((*out)[out->size() - 1]).bid);
796 
797  for (i = first.block_offset(); i < block_type::size; i++)
798  {
799  first_block->elem[i] = sorted_first_block->elem[i];
800  }
801 
802  reqs[0]->wait();
803  reqs[1]->wait();
804 
805  req = first_block->write(first_bid);
806 
807  for (i = 0; i < last.block_offset(); ++i)
808  {
809  last_block->elem[i] = sorted_last_block->elem[i];
810  }
811 
812  req->wait();
813 
814  req = last_block->write(last_bid);
815 
816  mng->delete_block(out->begin()->bid);
817  mng->delete_block((*out)[out->size() - 1].bid);
818 
819  *first.bid() = first_bid;
820  *last.bid() = last_bid;
821 
822  typename run_type::iterator it = out->begin();
823  ++it;
824  typename ExtIterator_::bids_container_iterator cur_bid = first.bid();
825  ++cur_bid;
826 
827  for ( ; cur_bid != last.bid(); ++cur_bid, ++it)
828  {
829  *cur_bid = (*it).bid;
830  }
831 
832  delete first_block;
833  delete sorted_first_block;
834  delete sorted_last_block;
835  delete[] reqs;
836  delete out;
837 
838  req->wait();
839 
840 
841  delete last_block;
842  }
843  else
844  {
845  // first element is
846  // not the first element of its block
847  typename ExtIterator_::block_type * first_block = new typename ExtIterator_::block_type;
848  typename ExtIterator_::bid_type first_bid;
849  request_ptr req;
850 
851  req = first_block->read(*first.bid());
852  mng->new_block(FR(), first_bid); // try to overlap
853  req->wait();
854 
855 
856  unsigned_type i = 0;
857  for ( ; i < first.block_offset(); ++i)
858  {
859  first_block->elem[i] = cmp.min_value();
860  }
861 
862  req = first_block->write(first_bid);
863 
864  n = last.bid() - first.bid();
865 
866  std::swap(first_bid, *first.bid());
867 
868  req->wait();
869 
870 
871  delete first_block;
872 
873  run_type * out =
874  sort_local::sort_blocks<
875  typename ExtIterator_::block_type,
876  typename ExtIterator_::vector_type::alloc_strategy_type,
877  typename ExtIterator_::bids_container_iterator>
878  (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp);
879 
880 
881  first_block = new typename ExtIterator_::block_type;
882 
883  typename ExtIterator_::block_type * sorted_first_block = new typename ExtIterator_::block_type;
884 
885  request_ptr * reqs = new request_ptr[2];
886 
887  reqs[0] = first_block->read(first_bid);
888  reqs[1] = sorted_first_block->read((*(out->begin())).bid);
889 
890  reqs[0]->wait();
891  reqs[1]->wait();
892 
893  for (i = first.block_offset(); i < block_type::size; ++i)
894  {
895  first_block->elem[i] = sorted_first_block->elem[i];
896  }
897 
898  req = first_block->write(first_bid);
899 
900  mng->delete_block(out->begin()->bid);
901 
902  *first.bid() = first_bid;
903 
904  typename run_type::iterator it = out->begin();
905  ++it;
906  typename ExtIterator_::bids_container_iterator cur_bid = first.bid();
907  ++cur_bid;
908 
909  for ( ; cur_bid != last.bid(); ++cur_bid, ++it)
910  {
911  *cur_bid = (*it).bid;
912  }
913 
914  *cur_bid = (*it).bid;
915 
916  delete sorted_first_block;
917  delete[] reqs;
918  delete out;
919 
920  req->wait();
921 
922  delete first_block;
923  }
924  }
925  else
926  {
927  if (last.block_offset()) // last is
928  // not the first element of its block
929  {
930  typename ExtIterator_::block_type * last_block = new typename ExtIterator_::block_type;
931  typename ExtIterator_::bid_type last_bid;
932  request_ptr req;
933  unsigned_type i;
934 
935  req = last_block->read(*last.bid());
936  mng->new_block(FR(), last_bid);
937  req->wait();
938 
939 
940  for (i = last.block_offset(); i < block_type::size; ++i)
941  {
942  last_block->elem[i] = cmp.max_value();
943  }
944 
945  req = last_block->write(last_bid);
946 
947  n = last.bid() - first.bid() + 1;
948 
949  std::swap(last_bid, *last.bid());
950 
951  req->wait();
952 
953 
954  delete last_block;
955 
956  run_type * out =
957  sort_local::sort_blocks<
958  typename ExtIterator_::block_type,
959  typename ExtIterator_::vector_type::alloc_strategy_type,
960  typename ExtIterator_::bids_container_iterator>
961  (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp);
962 
963 
964  last_block = new typename ExtIterator_::block_type;
965  typename ExtIterator_::block_type * sorted_last_block = new typename ExtIterator_::block_type;
966  request_ptr * reqs = new request_ptr[2];
967 
968  reqs[0] = last_block->read(last_bid);
969  reqs[1] = sorted_last_block->read(((*out)[out->size() - 1]).bid);
970 
971  reqs[0]->wait();
972  reqs[1]->wait();
973 
974  for (i = 0; i < last.block_offset(); ++i)
975  {
976  last_block->elem[i] = sorted_last_block->elem[i];
977  }
978 
979  req = last_block->write(last_bid);
980 
981  mng->delete_block((*out)[out->size() - 1].bid);
982 
983  *last.bid() = last_bid;
984 
985  typename run_type::iterator it = out->begin();
986  typename ExtIterator_::bids_container_iterator cur_bid = first.bid();
987 
988  for ( ; cur_bid != last.bid(); ++cur_bid, ++it)
989  {
990  *cur_bid = (*it).bid;
991  }
992 
993  delete sorted_last_block;
994  delete[] reqs;
995  delete out;
996 
997  req->wait();
998 
999  delete last_block;
1000  }
1001  else
1002  {
1003  // first and last element are first elements of their of blocks
1004  n = last.bid() - first.bid();
1005 
1006  run_type * out =
1007  sort_local::sort_blocks<typename ExtIterator_::block_type,
1008  typename ExtIterator_::vector_type::alloc_strategy_type,
1009  typename ExtIterator_::bids_container_iterator>
1010  (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp);
1011 
1012  typename run_type::iterator it = out->begin();
1013  typename ExtIterator_::bids_container_iterator cur_bid = first.bid();
1014 
1015  for ( ; cur_bid != last.bid(); ++cur_bid, ++it)
1016  {
1017  *cur_bid = (*it).bid;
1018  }
1019 
1020  delete out;
1021  }
1022  }
1023  }
1024 
1025 #if STXXL_CHECK_ORDER_IN_SORTS
1026  typedef typename ExtIterator_::const_iterator const_iterator;
1027  assert(stxxl::is_sorted(const_iterator(first), const_iterator(last), cmp));
1028 #endif
1029 }
1030 
1032 
1033 __STXXL_END_NAMESPACE
1034 
1035 #endif // !STXXL_SORT_HEADER
1036 // vim: et:ts=4:sw=4
fully randomized disk allocation scheme functor
Definition: block_alloc.h:69
Encapsulates asynchronous buffered block writing engine.
Definition: buf_writer.h:37
void sort(ExtIterator_ first, ExtIterator_ last, StrictWeakOrdering_ cmp, unsigned_type M)
Sort records comparison-based.
Definition: sort.h:700
Request with basic properties like file and offset.
Definition: request.h:39
void new_blocks(const DiskAssignFunctor &functor, BIDIteratorClass bidbegin, BIDIteratorClass bidend, unsigned_type offset=0)
Allocates new blocks.
Definition: mng.h:90
Implemented as reference counting smart pointer.
Definition: request_ptr.h:34
Encapsulates asynchronous prefetching engine.
Definition: block_prefetcher.h:54
void wait_all(request_iterator_ reqs_begin, request_iterator_ reqs_end)
Collection of functions to track statuses of a number of requests.
Definition: request_operations.h:36
void new_block(const DiskAssignFunctor &functor, BID< BLK_SIZE > &bid, unsigned_type offset=0)
Definition: mng.h:131
Provides a static class to store runtime tuning parameters.
void delete_block(const BID< BLK_SIZE > &bid)
Deallocates a block.
Definition: mng.h:204
virtual void wait(bool measure_time=true)=0
Suspends calling thread until completion of the request.
Block manager class.
Definition: mng.h:59