Stxxl  1.3.2
ksort.h
1 /***************************************************************************
2  * include/stxxl/bits/algo/ksort.h
3  *
4  * Part of the STXXL. See http://stxxl.sourceforge.net
5  *
6  * Copyright (C) 2002 Roman Dementiev <[email protected]>
7  * Copyright (C) 2008-2011 Andreas Beckmann <[email protected]>
8  *
9  * Distributed under the Boost Software License, Version 1.0.
10  * (See accompanying file LICENSE_1_0.txt or copy at
11  * http://www.boost.org/LICENSE_1_0.txt)
12  **************************************************************************/
13 
14 #ifndef STXXL_KSORT_HEADER
15 #define STXXL_KSORT_HEADER
16 
17 #include <stxxl/bits/mng/mng.h>
18 #include <stxxl/bits/common/rand.h>
19 #include <stxxl/bits/mng/adaptor.h>
20 #include <stxxl/bits/common/simple_vector.h>
21 #include <stxxl/bits/common/switch.h>
22 #include <stxxl/bits/mng/block_alloc_interleaved.h>
23 #include <stxxl/bits/algo/intksort.h>
24 #include <stxxl/bits/algo/adaptor.h>
25 #include <stxxl/bits/algo/async_schedule.h>
26 #include <stxxl/bits/mng/block_prefetcher.h>
27 #include <stxxl/bits/mng/buf_writer.h>
28 #include <stxxl/bits/algo/run_cursor.h>
29 #include <stxxl/bits/algo/losertree.h>
30 #include <stxxl/bits/algo/inmemsort.h>
31 #include <stxxl/bits/algo/sort_base.h>
32 #include <stxxl/bits/common/is_sorted.h>
33 #include <stxxl/bits/common/utils.h>
34 
35 
36 //#define INTERLEAVED_ALLOC
37 
38 #define OPT_MERGING
39 
40 __STXXL_BEGIN_NAMESPACE
41 
44 
49 
52 namespace ksort_local
53 {
54  template <typename _BIDTp, typename _KeyTp>
55  struct trigger_entry
56  {
57  typedef _BIDTp bid_type;
58  typedef _KeyTp key_type;
59 
60  bid_type bid;
61  key_type key;
62 
63  operator bid_type ()
64  {
65  return bid;
66  }
67  };
68 
69 
70  template <typename _BIDTp, typename _KeyTp>
71  inline bool operator < (const trigger_entry<_BIDTp, _KeyTp> & a,
72  const trigger_entry<_BIDTp, _KeyTp> & b)
73  {
74  return (a.key < b.key);
75  }
76 
77  template <typename _BIDTp, typename _KeyTp>
78  inline bool operator > (const trigger_entry<_BIDTp, _KeyTp> & a,
79  const trigger_entry<_BIDTp, _KeyTp> & b)
80  {
81  return (a.key > b.key);
82  }
83 
84  template <typename type, typename key_type1>
85  struct type_key
86  {
87  typedef key_type1 key_type;
88  key_type key;
89  type * ptr;
90 
91  type_key() { }
92  type_key(key_type k, type * p) : key(k), ptr(p)
93  { }
94  };
95 
96  template <typename type, typename key1>
97  bool operator < (const type_key<type, key1> & a, const type_key<type, key1> & b)
98  {
99  return a.key < b.key;
100  }
101 
102  template <typename type, typename key1>
103  bool operator > (const type_key<type, key1> & a, const type_key<type, key1> & b)
104  {
105  return a.key > b.key;
106  }
107 
108 
109  template <typename block_type, typename bid_type>
110  struct write_completion_handler
111  {
112  block_type * block;
113  bid_type bid;
114  request_ptr * req;
115  void operator () (request * /*completed_req*/)
116  {
117  * req = block->read(bid);
118  }
119  };
120 
121  template <typename type_key_,
122  typename block_type,
123  typename run_type,
124  typename input_bid_iterator,
125  typename key_extractor>
126  inline void write_out(
127  type_key_ * begin,
128  type_key_ * end,
129  block_type * & cur_blk,
130  const block_type * end_blk,
131  int_type & out_block,
132  int_type & out_pos,
133  run_type & run,
134  write_completion_handler<block_type, typename block_type::bid_type> * & next_read,
135  typename block_type::bid_type * & bids,
136  request_ptr * write_reqs,
137  request_ptr * read_reqs,
138  input_bid_iterator & it,
139  key_extractor keyobj)
140  {
141  typedef typename block_type::bid_type bid_type;
142  typedef typename block_type::type type;
143 
144  type * elem = cur_blk->elem;
145  for (type_key_ * p = begin; p < end; p++)
146  {
147  elem[out_pos++] = *(p->ptr);
148 
149  if (out_pos >= block_type::size)
150  {
151  run[out_block].key = keyobj(*(cur_blk->elem));
152 
153  if (cur_blk < end_blk)
154  {
155  next_read->block = cur_blk;
156  next_read->req = read_reqs + out_block;
157  read_reqs[out_block] = NULL;
158  bids[out_block] = next_read->bid = *(it++);
159 
160  write_reqs[out_block] = cur_blk->write(
161  run[out_block].bid,
162  // postpone read of block from next run
163  // after write of block from this run
164  *(next_read++));
165  }
166  else
167  {
168  write_reqs[out_block] = cur_blk->write(run[out_block].bid);
169  }
170 
171  cur_blk++;
172  elem = cur_blk->elem;
173  out_block++;
174  out_pos = 0;
175  }
176  }
177  }
178 
179  template <
180  typename block_type,
181  typename run_type,
182  typename input_bid_iterator,
183  typename key_extractor>
184  void
185  create_runs(
186  input_bid_iterator it,
187  run_type ** runs,
188  const unsigned_type nruns,
189  const unsigned_type m2,
190  key_extractor keyobj)
191  {
192  typedef typename block_type::value_type type;
193  typedef typename block_type::bid_type bid_type;
194  typedef typename key_extractor::key_type key_type;
195  typedef type_key<type, key_type> type_key_;
196 
197  block_manager * bm = block_manager::get_instance();
198  block_type * Blocks1 = new block_type[m2];
199  block_type * Blocks2 = new block_type[m2];
200  bid_type * bids = new bid_type[m2];
201  type_key_ * refs1 = new type_key_[m2 * Blocks1->size];
202  type_key_ * refs2 = new type_key_[m2 * Blocks1->size];
203  request_ptr * read_reqs = new request_ptr[m2];
204  request_ptr * write_reqs = new request_ptr[m2];
205  write_completion_handler<block_type, bid_type> * next_run_reads =
206  new write_completion_handler<block_type, bid_type>[m2];
207 
208  run_type * run;
209  run = *runs;
210  int_type run_size = (*runs)->size();
211  key_type offset = 0;
212  const int log_k1 = log2_ceil((m2 * block_type::size * sizeof(type_key_) / STXXL_L2_SIZE) ?
213  (m2 * block_type::size * sizeof(type_key_) / STXXL_L2_SIZE) : 2);
214  const int log_k2 = log2_floor(m2 * Blocks1->size) - log_k1 - 1;
215  STXXL_VERBOSE("log_k1: " << log_k1 << " log_k2:" << log_k2);
216  const int_type k1 = 1 << log_k1;
217  const int_type k2 = 1 << log_k2;
218  int_type * bucket1 = new int_type[k1];
219  int_type * bucket2 = new int_type[k2];
220  int_type i;
221 
222  disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
223 
224  for (i = 0; i < run_size; i++)
225  {
226  bids[i] = *(it++);
227  read_reqs[i] = Blocks1[i].read(bids[i]);
228  }
229 
230  unsigned_type k = 0;
231  const int shift1 = sizeof(key_type) * 8 - log_k1;
232  const int shift2 = shift1 - log_k2;
233  STXXL_VERBOSE("shift1: " << shift1 << " shift2:" << shift2);
234 
235  for ( ; k < nruns; k++)
236  {
237  run = runs[k];
238  run_size = run->size();
239 
240  std::fill(bucket1, bucket1 + k1, 0);
241 
242  type_key_ * ref_ptr = refs1;
243  for (i = 0; i < run_size; i++)
244  {
245  if (k)
246  write_reqs[i]->wait();
247 
248  read_reqs[i]->wait();
249  bm->delete_block(bids[i]);
250 
251  classify_block(Blocks1[i].begin(), Blocks1[i].end(), ref_ptr, bucket1, offset, shift1, keyobj);
252  }
253 
254  exclusive_prefix_sum(bucket1, k1);
255  classify(refs1, refs1 + run_size * Blocks1->size, refs2, bucket1,
256  offset, shift1);
257 
258  int_type out_block = 0;
259  int_type out_pos = 0;
260  unsigned_type next_run_size = (k < nruns - 1) ? (runs[k + 1]->size()) : 0;
261 
262  // recurse on each bucket
263  type_key_ * c = refs2;
264  type_key_ * d = refs1;
265  block_type * cur_blk = Blocks2;
266  block_type * end_blk = Blocks2 + next_run_size;
267  write_completion_handler<block_type, bid_type> * next_read = next_run_reads;
268 
269  for (i = 0; i < k1; i++)
270  {
271  type_key_ * cEnd = refs2 + bucket1[i];
272  type_key_ * dEnd = refs1 + bucket1[i];
273 
274  l1sort(c, cEnd, d, bucket2, k2,
275  offset + (key_type(1) << key_type(shift1)) * key_type(i), shift2); // key_type,key_type,... paranoia
276 
277  write_out(
278  d, dEnd, cur_blk, end_blk,
279  out_block, out_pos, *run, next_read, bids,
280  write_reqs, read_reqs, it, keyobj);
281 
282  c = cEnd;
283  d = dEnd;
284  }
285 
286  std::swap(Blocks1, Blocks2);
287  }
288 
289  wait_all(write_reqs, m2);
290 
291  delete[] bucket1;
292  delete[] bucket2;
293  delete[] refs1;
294  delete[] refs2;
295  delete[] Blocks1;
296  delete[] Blocks2;
297  delete[] bids;
298  delete[] next_run_reads;
299  delete[] read_reqs;
300  delete[] write_reqs;
301  }
302 
303  template <typename block_type,
304  typename prefetcher_type,
305  typename key_extractor>
306  struct run_cursor2_cmp : public std::binary_function<run_cursor2<block_type, prefetcher_type>, run_cursor2<block_type, prefetcher_type>, bool>
307  {
308  typedef run_cursor2<block_type, prefetcher_type> cursor_type;
309  key_extractor keyobj;
310  run_cursor2_cmp(key_extractor keyobj_)
311  {
312  keyobj = keyobj_;
313  }
314  inline bool operator () (const cursor_type & a, const cursor_type & b) const
315  {
316  if (UNLIKELY(b.empty()))
317  return true;
318  // sentinel emulation
319  if (UNLIKELY(a.empty()))
320  return false;
321  //sentinel emulation
322 
323  return (keyobj(a.current()) < keyobj(b.current()));
324  }
325 
326  private:
327  run_cursor2_cmp() { }
328  };
329 
330 
331  template <typename record_type, typename key_extractor>
332  class key_comparison : public std::binary_function<record_type, record_type, bool>
333  {
334  key_extractor ke;
335 
336  public:
337  key_comparison() { }
338  key_comparison(key_extractor ke_) : ke(ke_) { }
339  bool operator () (const record_type & a, const record_type & b) const
340  {
341  return ke(a) < ke(b);
342  }
343  };
344 
345 
346  template <typename block_type, typename run_type, typename key_ext_>
347  bool check_ksorted_runs(run_type ** runs,
348  unsigned_type nruns,
349  unsigned_type m,
350  key_ext_ keyext)
351  {
352  typedef typename block_type::value_type value_type;
353 
354  STXXL_MSG("check_ksorted_runs Runs: " << nruns);
355  unsigned_type irun = 0;
356  for (irun = 0; irun < nruns; ++irun)
357  {
358  const unsigned_type nblocks_per_run = runs[irun]->size();
359  unsigned_type blocks_left = nblocks_per_run;
360  block_type * blocks = new block_type[m];
361  request_ptr * reqs = new request_ptr[m];
362  value_type last = keyext.min_value();
363 
364  for (unsigned_type off = 0; off < nblocks_per_run; off += m)
365  {
366  const unsigned_type nblocks = STXXL_MIN(blocks_left, m);
367  const unsigned_type nelements = nblocks * block_type::size;
368  blocks_left -= nblocks;
369 
370  for (unsigned_type j = 0; j < nblocks; ++j)
371  {
372  reqs[j] = blocks[j].read((*runs[irun])[off + j].bid);
373  }
374  wait_all(reqs, reqs + nblocks);
375 
376  if (off && (keyext(blocks[0][0]) < keyext(last)))
377  {
378  STXXL_MSG("check_sorted_runs wrong first value in the run " << irun);
379  STXXL_MSG(" first value: " << blocks[0][0] << " with key" << keyext(blocks[0][0]));
380  STXXL_MSG(" last value: " << last << " with key" << keyext(last));
381  for (unsigned_type k = 0; k < block_type::size; ++k)
382  STXXL_MSG("Element " << k << " in the block is :" << blocks[0][k] << " key: " << keyext(blocks[0][k]));
383  return false;
384  }
385 
386  for (unsigned_type j = 0; j < nblocks; ++j)
387  {
388  if (keyext(blocks[j][0]) != (*runs[irun])[off + j].key)
389  {
390  STXXL_MSG("check_sorted_runs wrong trigger in the run " << irun << " block " << (off + j));
391  STXXL_MSG(" trigger value: " << (*runs[irun])[off + j].key);
392  STXXL_MSG("Data in the block:");
393  for (unsigned_type k = 0; k < block_type::size; ++k)
394  STXXL_MSG("Element " << k << " in the block is :" << blocks[j][k] << " with key: " << keyext(blocks[j][k]));
395 
396  STXXL_MSG("BIDS:");
397  for (unsigned_type k = 0; k < nblocks; ++k)
398  {
399  if (k == j)
400  STXXL_MSG("Bad one comes next.");
401  STXXL_MSG("BID " << (off + k) << " is: " << ((*runs[irun])[off + k].bid));
402  }
403 
404  return false;
405  }
406  }
407  if (!stxxl::is_sorted(make_element_iterator(blocks, 0),
408  make_element_iterator(blocks, nelements),
409  key_comparison<value_type, key_ext_>()))
410  {
411  STXXL_MSG("check_sorted_runs wrong order in the run " << irun);
412  STXXL_MSG("Data in blocks:");
413  for (unsigned_type j = 0; j < nblocks; ++j)
414  {
415  for (unsigned_type k = 0; k < block_type::size; ++k)
416  STXXL_MSG(" Element " << k << " in block " << (off + j) << " is :" << blocks[j][k] << " with key: " << keyext(blocks[j][k]));
417  }
418  STXXL_MSG("BIDS:");
419  for (unsigned_type k = 0; k < nblocks; ++k)
420  {
421  STXXL_MSG("BID " << (k + off) << " is: " << ((*runs[irun])[k + off].bid));
422  }
423 
424  return false;
425  }
426  last = blocks[nblocks - 1][block_type::size - 1];
427  }
428 
429  assert(blocks_left == 0);
430  delete[] reqs;
431  delete[] blocks;
432  }
433 
434  return true;
435  }
436 
437 
438  template <typename block_type, typename run_type, typename key_extractor>
439  void merge_runs(run_type ** in_runs, unsigned_type nruns, run_type * out_run, unsigned_type _m, key_extractor keyobj)
440  {
442  typedef run_cursor2<block_type, prefetcher_type> run_cursor_type;
443 
444  unsigned_type i;
445  run_type consume_seq(out_run->size());
446 
447  int_type * prefetch_seq = new int_type[out_run->size()];
448 
449  typename run_type::iterator copy_start = consume_seq.begin();
450  for (i = 0; i < nruns; i++)
451  {
452  // TODO: try to avoid copy
453  copy_start = std::copy(
454  in_runs[i]->begin(),
455  in_runs[i]->end(),
456  copy_start);
457  }
458  std::stable_sort(consume_seq.begin(), consume_seq.end() _STXXL_SORT_TRIGGER_FORCE_SEQUENTIAL);
459 
460  unsigned disks_number = config::get_instance()->disks_number();
461 
462 #ifdef PLAY_WITH_OPT_PREF
463  const int_type n_write_buffers = 4 * disks_number;
464 #else
465  const int_type n_prefetch_buffers = STXXL_MAX(int_type(2 * disks_number), (3 * (int_type(_m) - int_type(nruns)) / 4));
466  STXXL_VERBOSE("Prefetch buffers " << n_prefetch_buffers);
467  const int_type n_write_buffers = STXXL_MAX(int_type(2 * disks_number), int_type(_m) - int_type(nruns) - int_type(n_prefetch_buffers));
468  STXXL_VERBOSE("Write buffers " << n_write_buffers);
469  // heuristic
470  const int_type n_opt_prefetch_buffers = 2 * int_type(disks_number) + (3 * (int_type(n_prefetch_buffers) - int_type(2 * disks_number))) / 10;
471  STXXL_VERBOSE("Prefetch buffers " << n_opt_prefetch_buffers);
472 #endif
473 
474 #if STXXL_SORT_OPTIMAL_PREFETCHING
475  compute_prefetch_schedule(
476  consume_seq,
477  prefetch_seq,
478  n_opt_prefetch_buffers,
479  disks_number);
480 #else
481  for (i = 0; i < out_run->size(); i++)
482  prefetch_seq[i] = i;
483 
484 #endif
485 
486 
487  prefetcher_type prefetcher(consume_seq.begin(),
488  consume_seq.end(),
489  prefetch_seq,
490  nruns + n_prefetch_buffers);
491 
492  buffered_writer<block_type> writer(n_write_buffers, n_write_buffers / 2);
493 
494  unsigned_type out_run_size = out_run->size();
495 
496  run_cursor2_cmp<block_type, prefetcher_type, key_extractor> cmp(keyobj);
497  loser_tree<
498  run_cursor_type,
499  run_cursor2_cmp<block_type, prefetcher_type, key_extractor> >
500  losers(&prefetcher, nruns, cmp);
501 
502  block_type * out_buffer = writer.get_free_block();
503 
504  for (i = 0; i < out_run_size; i++)
505  {
506  losers.multi_merge(out_buffer->elem, out_buffer->elem + block_type::size);
507  (*out_run)[i].key = keyobj(out_buffer->elem[0]);
508  out_buffer = writer.write(out_buffer, (*out_run)[i].bid);
509  }
510 
511  delete[] prefetch_seq;
512 
513  block_manager * bm = block_manager::get_instance();
514  for (i = 0; i < nruns; i++)
515  {
516  unsigned_type sz = in_runs[i]->size();
517  for (unsigned_type j = 0; j < sz; j++)
518  bm->delete_block((*in_runs[i])[j].bid);
519 
520  delete in_runs[i];
521  }
522  }
523 
524 
525  template <typename block_type,
526  typename alloc_strategy,
527  typename input_bid_iterator,
528  typename key_extractor>
529 
530  simple_vector<trigger_entry<typename block_type::bid_type, typename key_extractor::key_type> > *
531  ksort_blocks(input_bid_iterator input_bids, unsigned_type _n, unsigned_type _m, key_extractor keyobj)
532  {
533  typedef typename block_type::value_type type;
534  typedef typename key_extractor::key_type key_type;
535  typedef typename block_type::bid_type bid_type;
536  typedef trigger_entry<bid_type, typename key_extractor::key_type> trigger_entry_type;
537  typedef simple_vector<trigger_entry_type> run_type;
538  typedef typename interleaved_alloc_traits<alloc_strategy>::strategy interleaved_alloc_strategy;
539 
540  unsigned_type m2 = div_ceil(_m, 2);
541  const unsigned_type m2_rf = m2 * block_type::raw_size /
542  (block_type::raw_size + block_type::size * sizeof(type_key<type, key_type>));
543  STXXL_VERBOSE("Reducing number of blocks in a run from " << m2 << " to " <<
544  m2_rf << " due to key size: " << sizeof(typename key_extractor::key_type) << " bytes");
545  m2 = m2_rf;
546  unsigned_type full_runs = _n / m2;
547  unsigned_type partial_runs = ((_n % m2) ? 1 : 0);
548  unsigned_type nruns = full_runs + partial_runs;
549  unsigned_type i;
550 
551  block_manager * mng = block_manager::get_instance();
552 
553  STXXL_VERBOSE("n=" << _n << " nruns=" << nruns << "=" << full_runs << "+" << partial_runs);
554 
555  double begin = timestamp(), after_runs_creation, end;
556 
557  run_type ** runs = new run_type *[nruns];
558 
559  for (i = 0; i < full_runs; i++)
560  runs[i] = new run_type(m2);
561 
562 
563 #ifdef INTERLEAVED_ALLOC
564  if (partial_runs)
565  {
566  unsigned_type last_run_size = _n - full_runs * m2;
567  runs[i] = new run_type(last_run_size);
568 
569  mng->new_blocks(interleaved_alloc_strategy(nruns, alloc_strategy()),
570  RunsToBIDArrayAdaptor2<block_type::raw_size, run_type>
571  (runs, 0, nruns, last_run_size),
572  RunsToBIDArrayAdaptor2<block_type::raw_size, run_type>
573  (runs, _n, nruns, last_run_size));
574  }
575  else
576  mng->new_blocks(interleaved_alloc_strategy(nruns, alloc_strategy()),
577  RunsToBIDArrayAdaptor<block_type::raw_size, run_type>
578  (runs, 0, nruns),
579  RunsToBIDArrayAdaptor<block_type::raw_size, run_type>
580  (runs, _n, nruns));
581 
582 #else
583  if (partial_runs)
584  runs[i] = new run_type(_n - full_runs * m2);
585 
586  for (i = 0; i < nruns; i++)
587  {
588  mng->new_blocks(alloc_strategy(), make_bid_iterator(runs[i]->begin()), make_bid_iterator(runs[i]->end()));
589  }
590 #endif
591 
592  create_runs<block_type,
593  run_type,
594  input_bid_iterator,
595  key_extractor>(input_bids, runs, nruns, m2, keyobj);
596 
597  after_runs_creation = timestamp();
598 
599  double io_wait_after_rf = stats::get_instance()->get_io_wait_time();
600 
601  disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
602 
603  const int_type merge_factor = optimal_merge_factor(nruns, _m);
604  run_type ** new_runs;
605 
606  while (nruns > 1)
607  {
608  int_type new_nruns = div_ceil(nruns, merge_factor);
609  STXXL_VERBOSE("Starting new merge phase: nruns: " << nruns <<
610  " opt_merge_factor: " << merge_factor << " m:" << _m << " new_nruns: " << new_nruns);
611 
612  new_runs = new run_type *[new_nruns];
613 
614  int_type runs_left = nruns;
615  int_type cur_out_run = 0;
616  int_type blocks_in_new_run = 0;
617 
618  while (runs_left > 0)
619  {
620  int_type runs2merge = STXXL_MIN(runs_left, merge_factor);
621  blocks_in_new_run = 0;
622  for (unsigned_type i = nruns - runs_left; i < (nruns - runs_left + runs2merge); i++)
623  blocks_in_new_run += runs[i]->size();
624 
625  // allocate run
626  new_runs[cur_out_run++] = new run_type(blocks_in_new_run);
627  runs_left -= runs2merge;
628  }
629  // allocate blocks in the new runs
630  if (cur_out_run == 1 && blocks_in_new_run == int_type(_n) && !input_bids->is_managed())
631  {
632  // if we sort a file we can reuse the input bids for the output
633  input_bid_iterator cur = input_bids;
634  for (int_type i = 0; cur != (input_bids + _n); ++cur)
635  {
636  (*new_runs[0])[i++].bid = *cur;
637  }
638 
639  bid_type & firstBID = (*new_runs[0])[0].bid;
640  if (firstBID.is_managed())
641  {
642  // the first block does not belong to the file
643  // need to reallocate it
644  mng->new_block(FR(), firstBID);
645  }
646  bid_type & lastBID = (*new_runs[0])[_n - 1].bid;
647  if (lastBID.is_managed())
648  {
649  // the first block does not belong to the file
650  // need to reallocate it
651  mng->new_block(FR(), lastBID);
652  }
653  }
654  else
655  {
656  mng->new_blocks(interleaved_alloc_strategy(new_nruns, alloc_strategy()),
657  RunsToBIDArrayAdaptor2<block_type::raw_size, run_type>(new_runs, 0, new_nruns, blocks_in_new_run),
658  RunsToBIDArrayAdaptor2<block_type::raw_size, run_type>(new_runs, _n, new_nruns, blocks_in_new_run));
659  }
660 
661 
662  // merge all
663  runs_left = nruns;
664  cur_out_run = 0;
665  while (runs_left > 0)
666  {
667  int_type runs2merge = STXXL_MIN(runs_left, merge_factor);
668 #if STXXL_CHECK_ORDER_IN_SORTS
669  assert((check_ksorted_runs<block_type, run_type, key_extractor>(runs + nruns - runs_left, runs2merge, m2, keyobj)));
670 #endif
671  STXXL_VERBOSE("Merging " << runs2merge << " runs");
672  merge_runs<block_type, run_type, key_extractor>(runs + nruns - runs_left,
673  runs2merge, *(new_runs + (cur_out_run++)), _m, keyobj);
674  runs_left -= runs2merge;
675  }
676 
677  nruns = new_nruns;
678  delete[] runs;
679  runs = new_runs;
680  }
681 
682  run_type * result = *runs;
683  delete[] runs;
684 
685  end = timestamp();
686 
687  STXXL_VERBOSE("Elapsed time : " << end - begin << " s. Run creation time: " <<
688  after_runs_creation - begin << " s");
689  STXXL_VERBOSE("Time in I/O wait(rf): " << io_wait_after_rf << " s");
690  STXXL_VERBOSE(*stats::get_instance());
691  STXXL_UNUSED(begin + after_runs_creation + end + io_wait_after_rf);
692 
693  return result;
694  }
695 }
696 
697 
736 template <typename ExtIterator_, typename KeyExtractor_>
743 void ksort(ExtIterator_ first_, ExtIterator_ last_, KeyExtractor_ keyobj, unsigned_type M__)
744 {
745  typedef simple_vector<ksort_local::trigger_entry<typename ExtIterator_::bid_type,
746  typename KeyExtractor_::key_type> > run_type;
747  typedef typename ExtIterator_::vector_type::value_type value_type;
748  typedef typename ExtIterator_::block_type block_type;
749 
750 
751  unsigned_type n = 0;
752  block_manager * mng = block_manager::get_instance();
753 
754  first_.flush();
755 
756  if ((last_ - first_) * sizeof(value_type) < M__)
757  {
758  stl_in_memory_sort(first_, last_, ksort_local::key_comparison<value_type, KeyExtractor_>(keyobj));
759  }
760  else
761  {
762  assert(2 * block_type::raw_size <= M__);
763 
764  if (first_.block_offset())
765  {
766  if (last_.block_offset()) // first and last element reside
767  // not in the beginning of the block
768  {
769  typename ExtIterator_::block_type * first_block = new typename ExtIterator_::block_type;
770  typename ExtIterator_::block_type * last_block = new typename ExtIterator_::block_type;
771  typename ExtIterator_::bid_type first_bid, last_bid;
772  request_ptr req;
773 
774  req = first_block->read(*first_.bid());
775  mng->new_block(FR(), first_bid); // try to overlap
776  mng->new_block(FR(), last_bid);
777  req->wait();
778 
779 
780  req = last_block->read(*last_.bid());
781 
782  unsigned_type i = 0;
783  for ( ; i < first_.block_offset(); i++)
784  {
785  first_block->elem[i] = keyobj.min_value();
786  }
787 
788  req->wait();
789 
790  req = first_block->write(first_bid);
791  for (i = last_.block_offset(); i < block_type::size; i++)
792  {
793  last_block->elem[i] = keyobj.max_value();
794  }
795 
796  req->wait();
797 
798  req = last_block->write(last_bid);
799 
800  n = last_.bid() - first_.bid() + 1;
801 
802  std::swap(first_bid, *first_.bid());
803  std::swap(last_bid, *last_.bid());
804 
805  req->wait();
806 
807  delete first_block;
808  delete last_block;
809 
810  run_type * out =
811  ksort_local::ksort_blocks<
812  typename ExtIterator_::block_type,
813  typename ExtIterator_::vector_type::alloc_strategy_type,
814  typename ExtIterator_::bids_container_iterator,
815  KeyExtractor_>
816  (first_.bid(), n, M__ / block_type::raw_size, keyobj);
817 
818 
819  first_block = new typename ExtIterator_::block_type;
820  last_block = new typename ExtIterator_::block_type;
821  typename ExtIterator_::block_type * sorted_first_block = new typename ExtIterator_::block_type;
822  typename ExtIterator_::block_type * sorted_last_block = new typename ExtIterator_::block_type;
823  request_ptr * reqs = new request_ptr[2];
824 
825  reqs[0] = first_block->read(first_bid);
826  reqs[1] = sorted_first_block->read((*(out->begin())).bid);
827  wait_all(reqs, 2);
828 
829  reqs[0] = last_block->read(last_bid);
830  reqs[1] = sorted_last_block->read(((*out)[out->size() - 1]).bid);
831 
832  for (i = first_.block_offset(); i < block_type::size; i++)
833  {
834  first_block->elem[i] = sorted_first_block->elem[i];
835  }
836  wait_all(reqs, 2);
837 
838  req = first_block->write(first_bid);
839 
840  for (i = 0; i < last_.block_offset(); i++)
841  {
842  last_block->elem[i] = sorted_last_block->elem[i];
843  }
844 
845  req->wait();
846 
847 
848  req = last_block->write(last_bid);
849 
850  mng->delete_block(out->begin()->bid);
851  mng->delete_block((*out)[out->size() - 1].bid);
852 
853  *first_.bid() = first_bid;
854  *last_.bid() = last_bid;
855 
856  typename run_type::iterator it = out->begin();
857  it++;
858  typename ExtIterator_::bids_container_iterator cur_bid = first_.bid();
859  cur_bid++;
860 
861  for ( ; cur_bid != last_.bid(); cur_bid++, it++)
862  {
863  *cur_bid = (*it).bid;
864  }
865 
866  delete first_block;
867  delete sorted_first_block;
868  delete sorted_last_block;
869  delete[] reqs;
870  delete out;
871 
872  req->wait();
873 
874  delete last_block;
875  }
876  else
877  {
878  // first element resides
879  // not in the beginning of the block
880 
881  typename ExtIterator_::block_type * first_block = new typename ExtIterator_::block_type;
882  typename ExtIterator_::bid_type first_bid;
883  request_ptr req;
884 
885  req = first_block->read(*first_.bid());
886  mng->new_block(FR(), first_bid); // try to overlap
887  req->wait();
888 
889 
890  unsigned_type i = 0;
891  for ( ; i < first_.block_offset(); i++)
892  {
893  first_block->elem[i] = keyobj.min_value();
894  }
895 
896  req = first_block->write(first_bid);
897 
898  n = last_.bid() - first_.bid();
899 
900  std::swap(first_bid, *first_.bid());
901 
902  req->wait();
903 
904  delete first_block;
905 
906  run_type * out =
907  ksort_local::ksort_blocks<
908  typename ExtIterator_::block_type,
909  typename ExtIterator_::vector_type::alloc_strategy_type,
910  typename ExtIterator_::bids_container_iterator,
911  KeyExtractor_>
912  (first_.bid(), n, M__ / block_type::raw_size, keyobj);
913 
914 
915  first_block = new typename ExtIterator_::block_type;
916 
917  typename ExtIterator_::block_type * sorted_first_block = new typename ExtIterator_::block_type;
918 
919  request_ptr * reqs = new request_ptr[2];
920 
921  reqs[0] = first_block->read(first_bid);
922  reqs[1] = sorted_first_block->read((*(out->begin())).bid);
923  wait_all(reqs, 2);
924 
925  for (i = first_.block_offset(); i < block_type::size; i++)
926  {
927  first_block->elem[i] = sorted_first_block->elem[i];
928  }
929 
930  req = first_block->write(first_bid);
931 
932  mng->delete_block(out->begin()->bid);
933 
934  *first_.bid() = first_bid;
935 
936  typename run_type::iterator it = out->begin();
937  it++;
938  typename ExtIterator_::bids_container_iterator cur_bid = first_.bid();
939  cur_bid++;
940 
941  for ( ; cur_bid != last_.bid(); cur_bid++, it++)
942  {
943  *cur_bid = (*it).bid;
944  }
945 
946  *cur_bid = (*it).bid;
947 
948  delete sorted_first_block;
949  delete[] reqs;
950  delete out;
951 
952  req->wait();
953 
954  delete first_block;
955  }
956  }
957  else
958  {
959  if (last_.block_offset()) // last element resides
960  // not in the beginning of the block
961  {
962  typename ExtIterator_::block_type * last_block = new typename ExtIterator_::block_type;
963  typename ExtIterator_::bid_type last_bid;
964  request_ptr req;
965  unsigned_type i;
966 
967  req = last_block->read(*last_.bid());
968  mng->new_block(FR(), last_bid);
969  req->wait();
970 
971  for (i = last_.block_offset(); i < block_type::size; i++)
972  {
973  last_block->elem[i] = keyobj.max_value();
974  }
975 
976  req = last_block->write(last_bid);
977 
978  n = last_.bid() - first_.bid() + 1;
979 
980  std::swap(last_bid, *last_.bid());
981 
982  req->wait();
983 
984  delete last_block;
985 
986  run_type * out =
987  ksort_local::ksort_blocks<
988  typename ExtIterator_::block_type,
989  typename ExtIterator_::vector_type::alloc_strategy_type,
990  typename ExtIterator_::bids_container_iterator,
991  KeyExtractor_>
992  (first_.bid(), n, M__ / block_type::raw_size, keyobj);
993 
994 
995  last_block = new typename ExtIterator_::block_type;
996  typename ExtIterator_::block_type * sorted_last_block = new typename ExtIterator_::block_type;
997  request_ptr * reqs = new request_ptr[2];
998 
999  reqs[0] = last_block->read(last_bid);
1000  reqs[1] = sorted_last_block->read(((*out)[out->size() - 1]).bid);
1001  wait_all(reqs, 2);
1002 
1003  for (i = 0; i < last_.block_offset(); i++)
1004  {
1005  last_block->elem[i] = sorted_last_block->elem[i];
1006  }
1007 
1008  req = last_block->write(last_bid);
1009 
1010  mng->delete_block((*out)[out->size() - 1].bid);
1011 
1012  *last_.bid() = last_bid;
1013 
1014  typename run_type::iterator it = out->begin();
1015  typename ExtIterator_::bids_container_iterator cur_bid = first_.bid();
1016 
1017  for ( ; cur_bid != last_.bid(); cur_bid++, it++)
1018  {
1019  *cur_bid = (*it).bid;
1020  }
1021 
1022  delete sorted_last_block;
1023  delete[] reqs;
1024  delete out;
1025 
1026  req->wait();
1027 
1028  delete last_block;
1029  }
1030  else
1031  {
1032  // first and last element reside in the beginning of blocks
1033  n = last_.bid() - first_.bid();
1034 
1035  run_type * out =
1036  ksort_local::ksort_blocks<
1037  typename ExtIterator_::block_type,
1038  typename ExtIterator_::vector_type::alloc_strategy_type,
1039  typename ExtIterator_::bids_container_iterator,
1040  KeyExtractor_>
1041  (first_.bid(), n, M__ / block_type::raw_size, keyobj);
1042 
1043  typename run_type::iterator it = out->begin();
1044  typename ExtIterator_::bids_container_iterator cur_bid = first_.bid();
1045 
1046  for ( ; cur_bid != last_.bid(); cur_bid++, it++)
1047  {
1048  *cur_bid = (*it).bid;
1049  }
1050 
1051  delete out;
1052  }
1053  }
1054  }
1055 
1056 #if STXXL_CHECK_ORDER_IN_SORTS
1057  typedef typename ExtIterator_::const_iterator const_iterator;
1058  assert(stxxl::is_sorted(const_iterator(first_), const_iterator(last_),
1059  ksort_local::key_comparison<value_type, KeyExtractor_>()));
1060 #endif
1061 }
1062 
1063 template <typename record_type>
1064 struct ksort_defaultkey
1065 {
1066  typedef typename record_type::key_type key_type;
1067  key_type operator () (const record_type & obj) const
1068  {
1069  return obj.key();
1070  }
1071  record_type max_value() const
1072  {
1073  return record_type::max_value();
1074  }
1075  record_type min_value() const
1076  {
1077  return record_type::min_value();
1078  }
1079 };
1080 
1081 
1087 
1096 template <typename ExtIterator_>
1097 void ksort(ExtIterator_ first_, ExtIterator_ last_, unsigned_type M__)
1098 {
1099  ksort(first_, last_,
1100  ksort_defaultkey<typename ExtIterator_::vector_type::value_type>(), M__);
1101 }
1102 
1103 
1105 
1106 __STXXL_END_NAMESPACE
1107 
1108 #endif // !STXXL_KSORT_HEADER
1109 // vim: et:ts=4:sw=4
fully randomized disk allocation scheme functor
Definition: block_alloc.h:69
Encapsulates asynchronous buffered block writing engine.
Definition: buf_writer.h:37
void ksort(ExtIterator_ first_, ExtIterator_ last_, KeyExtractor_ keyobj, unsigned_type M__)
Sort records with integer keys.
Definition: ksort.h:743
Request with basic properties like file and offset.
Definition: request.h:39
void new_blocks(const DiskAssignFunctor &functor, BIDIteratorClass bidbegin, BIDIteratorClass bidend, unsigned_type offset=0)
Allocates new blocks.
Definition: mng.h:90
Implemented as reference counting smart pointer.
Definition: request_ptr.h:34
Encapsulates asynchronous prefetching engine.
Definition: block_prefetcher.h:54
void wait_all(request_iterator_ reqs_begin, request_iterator_ reqs_end)
Collection of functions to track statuses of a number of requests.
Definition: request_operations.h:36
void new_block(const DiskAssignFunctor &functor, BID< BLK_SIZE > &bid, unsigned_type offset=0)
Definition: mng.h:131
void delete_block(const BID< BLK_SIZE > &bid)
Deallocates a block.
Definition: mng.h:204
virtual void wait(bool measure_time=true)=0
Suspends calling thread until completion of the request.
Block manager class.
Definition: mng.h:59