Stxxl  1.3.2
stable_ksort.h
1 /***************************************************************************
2  * include/stxxl/bits/algo/stable_ksort.h
3  *
4  * Part of the STXXL. See http://stxxl.sourceforge.net
5  *
6  * Copyright (C) 2003 Roman Dementiev <[email protected]>
7  * Copyright (C) 2008 Andreas Beckmann <[email protected]>
8  *
9  * Distributed under the Boost Software License, Version 1.0.
10  * (See accompanying file LICENSE_1_0.txt or copy at
11  * http://www.boost.org/LICENSE_1_0.txt)
12  **************************************************************************/
13 
14 #ifndef STXXL_STABLE_KSORT_HEADER
15 #define STXXL_STABLE_KSORT_HEADER
16 
17 // it is a first try: distribution sort without sampling
18 // I rework the stable_ksort when I would have a time
19 
20 
21 #include <stxxl/bits/mng/mng.h>
22 #include <stxxl/bits/mng/buf_istream.h>
23 #include <stxxl/bits/mng/buf_ostream.h>
24 #include <stxxl/bits/common/simple_vector.h>
25 #include <stxxl/bits/algo/intksort.h>
26 #include <stxxl/bits/algo/sort_base.h>
27 #include <stxxl/bits/common/utils.h>
28 
29 
30 #ifndef STXXL_VERBOSE_STABLE_KSORT
31 #define STXXL_VERBOSE_STABLE_KSORT STXXL_VERBOSE1
32 #endif
33 
34 
35 __STXXL_BEGIN_NAMESPACE
36 
39 
42 namespace stable_ksort_local
43 {
44  template <class type_, class type_key>
45  void classify_block(type_ * begin, type_ * end, type_key * & out, int_type * bucket, unsigned_type offset, unsigned shift)
46  {
47  for (type_ * p = begin; p < end; p++, out++) // count & create references
48  {
49  out->ptr = p;
50  typename type_::key_type key = p->key();
51  int_type ibucket = (key - offset) >> shift;
52  out->key = key;
53  bucket[ibucket]++;
54  }
55  }
56 
57  template <typename type>
58  struct type_key
59  {
60  typedef typename type::key_type key_type;
61  key_type key;
62  type * ptr;
63 
64  type_key() { }
65  type_key(key_type k, type * p) : key(k), ptr(p)
66  { }
67  };
68 
69  template <typename type>
70  bool operator < (const type_key<type> & a, const type_key<type> & b)
71  {
72  return a.key < b.key;
73  }
74 
75  template <typename type>
76  bool operator > (const type_key<type> & a, const type_key<type> & b)
77  {
78  return a.key > b.key;
79  }
80 
81 
82  template <typename BIDType_, typename AllocStrategy_>
83  class bid_sequence
84  {
85  public:
86  typedef BIDType_ bid_type;
87  typedef bid_type & reference;
88  typedef AllocStrategy_ alloc_strategy;
89  typedef typename simple_vector<bid_type>::size_type size_type;
90  typedef typename simple_vector<bid_type>::iterator iterator;
91 
92  protected:
93  simple_vector<bid_type> * bids;
94  alloc_strategy alloc_strategy_;
95 
96  public:
97  bid_sequence() { }
98  bid_sequence(size_type size_)
99  {
100  bids = new simple_vector<bid_type>(size_);
101  block_manager * mng = block_manager::get_instance();
102  mng->new_blocks(alloc_strategy_, bids->begin(), bids->end());
103  }
104  void init(size_type size_)
105  {
106  bids = new simple_vector<bid_type>(size_);
107  block_manager * mng = block_manager::get_instance();
108  mng->new_blocks(alloc_strategy_, bids->begin(), bids->end());
109  }
110  reference operator [] (size_type i)
111  {
112  size_type size_ = size(); // cache size in a register
113  if (i < size_)
114  return *(bids->begin() + i);
115 
116  block_manager * mng = block_manager::get_instance();
117  simple_vector<bid_type> * larger_bids = new simple_vector<bid_type>((i + 1) * 2);
118  std::copy(bids->begin(), bids->end(), larger_bids->begin());
119  mng->new_blocks(alloc_strategy_, larger_bids->begin() + size_, larger_bids->end());
120  delete bids;
121  bids = larger_bids;
122  return *(larger_bids->begin() + i);
123  }
124  size_type size() { return bids->size(); }
125  iterator begin() { return bids->begin(); }
126  ~bid_sequence()
127  {
128  block_manager::get_instance()->delete_blocks(bids->begin(), bids->end());
129  delete bids;
130  }
131  };
132 
133  template <typename ExtIterator_>
134  void distribute(
135  bid_sequence<typename ExtIterator_::vector_type::block_type::bid_type,
136  typename ExtIterator_::vector_type::alloc_strategy_type> * bucket_bids,
137  int64 * bucket_sizes,
138  const int_type nbuckets,
139  const int_type lognbuckets,
140  ExtIterator_ first,
141  ExtIterator_ last,
142  const int_type nread_buffers,
143  const int_type nwrite_buffers)
144  {
145  typedef typename ExtIterator_::vector_type::value_type value_type;
146  typedef typename value_type::key_type key_type;
147  typedef typename ExtIterator_::block_type block_type;
148  typedef typename block_type::bid_type bid_type;
149  typedef buf_istream<typename ExtIterator_::block_type,
150  typename ExtIterator_::bids_container_iterator> buf_istream_type;
151 
152  int_type i = 0;
153 
154  buf_istream_type in(first.bid(), last.bid() + ((first.block_offset()) ? 1 : 0),
155  nread_buffers);
156 
158  nbuckets + nwrite_buffers,
159  nwrite_buffers);
160 
161  unsigned_type * bucket_block_offsets = new unsigned_type[nbuckets];
162  unsigned_type * bucket_iblock = new unsigned_type[nbuckets];
163  block_type ** bucket_blocks = new block_type *[nbuckets];
164 
165  std::fill(bucket_sizes, bucket_sizes + nbuckets, 0);
166  std::fill(bucket_iblock, bucket_iblock + nbuckets, 0);
167  std::fill(bucket_block_offsets, bucket_block_offsets + nbuckets, 0);
168 
169  for (i = 0; i < nbuckets; i++)
170  bucket_blocks[i] = out.get_free_block();
171 
172 
173  ExtIterator_ cur = first - first.block_offset();
174 
175  // skip part of the block before first untouched
176  for ( ; cur != first; cur++)
177  ++in;
178 
179 
180  const int_type shift = sizeof(key_type) * 8 - lognbuckets;
181  // search in the the range [_begin,_end)
182  STXXL_VERBOSE_STABLE_KSORT("Shift by: " << shift << " bits, lognbuckets: " << lognbuckets);
183  for ( ; cur != last; cur++)
184  {
185  key_type cur_key = in.current().key();
186  int_type ibucket = cur_key >> shift;
187 
188  int_type block_offset = bucket_block_offsets[ibucket];
189  in >> (bucket_blocks[ibucket]->elem[block_offset++]);
190  if (block_offset == block_type::size)
191  {
192  block_offset = 0;
193  int_type iblock = bucket_iblock[ibucket]++;
194  bucket_blocks[ibucket] = out.write(bucket_blocks[ibucket], bucket_bids[ibucket][iblock]);
195  }
196  bucket_block_offsets[ibucket] = block_offset;
197  }
198  for (i = 0; i < nbuckets; i++)
199  {
200  if (bucket_block_offsets[i])
201  {
202  out.write(bucket_blocks[i], bucket_bids[i][bucket_iblock[i]]);
203  }
204  bucket_sizes[i] = int64(block_type::size) * bucket_iblock[i] +
205  bucket_block_offsets[i];
206  STXXL_VERBOSE_STABLE_KSORT("Bucket " << i << " has size " << bucket_sizes[i] <<
207  ", estimated size: " << ((last - first) / int64(nbuckets)));
208  }
209 
210  delete[] bucket_blocks;
211  delete[] bucket_block_offsets;
212  delete[] bucket_iblock;
213  }
214 }
215 
223 template <typename ExtIterator_>
224 void stable_ksort(ExtIterator_ first, ExtIterator_ last, unsigned_type M)
225 {
226  STXXL_MSG("Warning: stable_ksort is not yet fully implemented, it assumes that the keys are uniformly distributed between [0,(std::numeric_limits<key_type>::max)()]");
227  typedef typename ExtIterator_::vector_type::value_type value_type;
228  typedef typename value_type::key_type key_type;
229  typedef typename ExtIterator_::block_type block_type;
230  typedef typename block_type::bid_type bid_type;
231  typedef typename ExtIterator_::vector_type::alloc_strategy_type alloc_strategy;
232  typedef stable_ksort_local::bid_sequence<bid_type, alloc_strategy> bucket_bids_type;
233  typedef stable_ksort_local::type_key<value_type> type_key_;
234 
235  first.flush(); // flush container
236 
237  double begin = timestamp();
238 
239  unsigned_type i = 0;
240  config * cfg = config::get_instance();
241  const unsigned_type m = M / block_type::raw_size;
242  assert(2 * block_type::raw_size <= M);
243  const unsigned_type write_buffers_multiple = 2;
244  const unsigned_type read_buffers_multiple = 2;
245  const unsigned_type ndisks = cfg->disks_number();
246  const unsigned_type min_num_read_write_buffers = (write_buffers_multiple + read_buffers_multiple) * ndisks;
247  const unsigned_type nmaxbuckets = m - min_num_read_write_buffers;
248  const unsigned_type lognbuckets = log2_floor(nmaxbuckets);
249  const unsigned_type nbuckets = 1 << lognbuckets;
250  const unsigned_type est_bucket_size = div_ceil((last - first) / nbuckets, block_type::size); //in blocks
251 
252  if (m < min_num_read_write_buffers + 2 || nbuckets < 2) {
253  STXXL_ERRMSG("stxxl::stable_ksort: Not enough memory. Blocks available: " << m <<
254  ", required for r/w buffers: " << min_num_read_write_buffers <<
255  ", required for buckets: 2, nbuckets: " << nbuckets);
256  throw bad_parameter("stxxl::stable_ksort(): INSUFFICIENT MEMORY provided, please increase parameter 'M'");
257  }
258  STXXL_VERBOSE_STABLE_KSORT("Elements to sort: " << (last - first));
259  STXXL_VERBOSE_STABLE_KSORT("Number of buckets has to be reduced from " << nmaxbuckets << " to " << nbuckets);
260  const unsigned_type nread_buffers = (m - nbuckets) * read_buffers_multiple / (read_buffers_multiple + write_buffers_multiple);
261  const unsigned_type nwrite_buffers = (m - nbuckets) * write_buffers_multiple / (read_buffers_multiple + write_buffers_multiple);
262 
263  STXXL_VERBOSE_STABLE_KSORT("Read buffers in distribution phase: " << nread_buffers);
264  STXXL_VERBOSE_STABLE_KSORT("Write buffers in distribution phase: " << nwrite_buffers);
265 
266  bucket_bids_type * bucket_bids = new bucket_bids_type[nbuckets];
267  for (i = 0; i < nbuckets; ++i)
268  bucket_bids[i].init(est_bucket_size);
269 
270  int64 * bucket_sizes = new int64[nbuckets];
271 
272  disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
273 
274  stable_ksort_local::distribute(
275  bucket_bids,
276  bucket_sizes,
277  nbuckets,
278  lognbuckets,
279  first,
280  last,
281  nread_buffers,
282  nwrite_buffers);
283 
284  double dist_end = timestamp(), end;
285  double io_wait_after_d = stats::get_instance()->get_io_wait_time();
286 
287  {
288  // sort buckets
289  unsigned_type write_buffers_multiple_bs = 2;
290  unsigned_type max_bucket_size_bl = (m - write_buffers_multiple_bs * ndisks) / 2; // in number of blocks
291  int64 max_bucket_size_rec = int64(max_bucket_size_bl) * block_type::size; // in number of records
292  int64 max_bucket_size_act = 0; // actual max bucket size
293  // establish output stream
294 
295  for (i = 0; i < nbuckets; i++)
296  {
297  max_bucket_size_act = STXXL_MAX(bucket_sizes[i], max_bucket_size_act);
298  if (bucket_sizes[i] > max_bucket_size_rec)
299  {
300  STXXL_ERRMSG("Bucket " << i << " is too large: " << bucket_sizes[i] <<
301  " records, maximum: " << max_bucket_size_rec);
302  STXXL_ERRMSG("Recursion on buckets is not yet implemented, aborting.");
303  abort();
304  }
305  }
306  // here we can increase write_buffers_multiple_b knowing max(bucket_sizes[i])
307  // ... and decrease max_bucket_size_bl
308  const int_type max_bucket_size_act_bl = div_ceil(max_bucket_size_act, block_type::size);
309  STXXL_VERBOSE_STABLE_KSORT("Reducing required number of required blocks per bucket from " <<
310  max_bucket_size_bl << " to " << max_bucket_size_act_bl);
311  max_bucket_size_rec = max_bucket_size_act;
312  max_bucket_size_bl = max_bucket_size_act_bl;
313  const unsigned_type nwrite_buffers_bs = m - 2 * max_bucket_size_bl;
314  STXXL_VERBOSE_STABLE_KSORT("Write buffers in bucket sorting phase: " << nwrite_buffers_bs);
315 
317  buf_ostream_type out(first.bid(), nwrite_buffers_bs);
318 
319  disk_queues::get_instance()->set_priority_op(request_queue::READ);
320 
321  if (first.block_offset())
322  {
323  // has to skip part of the first block
324  block_type * block = new block_type;
325  request_ptr req;
326  req = block->read(*first.bid());
327  req->wait();
328 
329  for (i = 0; i < first.block_offset(); i++)
330  {
331  out << block->elem[i];
332  }
333  delete block;
334  }
335  block_type * blocks1 = new block_type[max_bucket_size_bl];
336  block_type * blocks2 = new block_type[max_bucket_size_bl];
337  request_ptr * reqs1 = new request_ptr[max_bucket_size_bl];
338  request_ptr * reqs2 = new request_ptr[max_bucket_size_bl];
339  type_key_ * refs1 = new type_key_[max_bucket_size_rec];
340  type_key_ * refs2 = new type_key_[max_bucket_size_rec];
341 
342  // submit reading first 2 buckets (Peter's scheme)
343  unsigned_type nbucket_blocks = div_ceil(bucket_sizes[0], block_type::size);
344  for (i = 0; i < nbucket_blocks; i++)
345  reqs1[i] = blocks1[i].read(bucket_bids[0][i]);
346 
347 
348  nbucket_blocks = div_ceil(bucket_sizes[1], block_type::size);
349  for (i = 0; i < nbucket_blocks; i++)
350  reqs2[i] = blocks2[i].read(bucket_bids[1][i]);
351 
352 
353  key_type offset = 0;
354  const unsigned log_k1 = STXXL_MAX<unsigned>(log2_ceil(max_bucket_size_rec * sizeof(type_key_) / STXXL_L2_SIZE), 1);
355  unsigned_type k1 = 1 << log_k1;
356  int_type * bucket1 = new int_type[k1];
357 
358  const unsigned shift = sizeof(key_type) * 8 - lognbuckets;
359  const unsigned shift1 = shift - log_k1;
360 
361  STXXL_VERBOSE_STABLE_KSORT("Classifying " << nbuckets << " buckets, max size:" << max_bucket_size_rec <<
362  " block size:" << block_type::size << " log_k1:" << log_k1);
363 
364  for (unsigned_type k = 0; k < nbuckets; k++)
365  {
366  nbucket_blocks = div_ceil(bucket_sizes[k], block_type::size);
367  const unsigned log_k1_k = STXXL_MAX<unsigned>(log2_ceil(bucket_sizes[k] * sizeof(type_key_) / STXXL_L2_SIZE), 1);
368  assert(log_k1_k <= log_k1);
369  k1 = 1 << log_k1_k;
370  std::fill(bucket1, bucket1 + k1, 0);
371 
372  STXXL_VERBOSE_STABLE_KSORT("Classifying bucket " << k << " size:" << bucket_sizes[k] <<
373  " blocks:" << nbucket_blocks << " log_k1:" << log_k1_k);
374  // classify first nbucket_blocks-1 blocks, they are full
375  type_key_ * ref_ptr = refs1;
376  key_type offset1 = offset + (key_type(1) << key_type(shift)) * key_type(k);
377  for (i = 0; i < nbucket_blocks - 1; i++)
378  {
379  reqs1[i]->wait();
380  stable_ksort_local::classify_block(blocks1[i].begin(), blocks1[i].end(), ref_ptr, bucket1, offset1, shift1 /*,k1*/);
381  }
382  // last block might be non-full
383  const unsigned_type last_block_size = bucket_sizes[k] - int64(nbucket_blocks - 1) * block_type::size;
384  reqs1[i]->wait();
385 
386  //STXXL_MSG("block_type::size: "<<block_type::size<<" last_block_size:"<<last_block_size);
387 
388  classify_block(blocks1[i].begin(), blocks1[i].begin() + last_block_size, ref_ptr, bucket1, offset1, shift1);
389 
390  exclusive_prefix_sum(bucket1, k1);
391  classify(refs1, refs1 + bucket_sizes[k], refs2, bucket1, offset1, shift1);
392 
393  type_key_ * c = refs2;
394  type_key_ * d = refs1;
395  for (i = 0; i < k1; i++)
396  {
397  type_key_ * cEnd = refs2 + bucket1[i];
398  type_key_ * dEnd = refs1 + bucket1[i];
399 
400  const unsigned log_k2 = log2_floor(bucket1[i]) - 1; // adaptive bucket size
401  const unsigned_type k2 = 1 << log_k2;
402  int_type * bucket2 = new int_type[k2];
403  const unsigned shift2 = shift1 - log_k2;
404 
405  // STXXL_MSG("Sorting bucket "<<k<<":"<<i);
406  l1sort(c, cEnd, d, bucket2, k2,
407  offset1 + (key_type(1) << key_type(shift1)) * key_type(i),
408  shift2);
409 
410  // write out all
411  for (type_key_ * p = d; p < dEnd; p++)
412  out << (*(p->ptr));
413 
414 
415  delete[] bucket2;
416  c = cEnd;
417  d = dEnd;
418  }
419  // submit next read
420  const unsigned_type bucket2submit = k + 2;
421  if (bucket2submit < nbuckets)
422  {
423  nbucket_blocks = div_ceil(bucket_sizes[bucket2submit], block_type::size);
424  for (i = 0; i < nbucket_blocks; i++)
425  reqs1[i] = blocks1[i].read(bucket_bids[bucket2submit][i]);
426  }
427 
428  std::swap(blocks1, blocks2);
429  std::swap(reqs1, reqs2);
430  }
431 
432  delete[] bucket1;
433  delete[] refs1;
434  delete[] refs2;
435  delete[] blocks1;
436  delete[] blocks2;
437  delete[] reqs1;
438  delete[] reqs2;
439  delete[] bucket_bids;
440  delete[] bucket_sizes;
441 
442  if (last.block_offset())
443  {
444  // has to skip part of the first block
445  block_type * block = new block_type;
446  request_ptr req = block->read(*last.bid());
447  req->wait();
448 
449  for (i = last.block_offset(); i < block_type::size; i++)
450  {
451  out << block->elem[i];
452  }
453  delete block;
454  }
455 
456  end = timestamp();
457  }
458 
459  STXXL_VERBOSE("Elapsed time : " << end - begin << " s. Distribution time: " <<
460  dist_end - begin << " s");
461  STXXL_VERBOSE("Time in I/O wait(ds): " << io_wait_after_d << " s");
462  STXXL_VERBOSE(*stats::get_instance());
463  STXXL_UNUSED(begin + dist_end + end + io_wait_after_d);
464 }
465 
467 
468 __STXXL_END_NAMESPACE
469 
470 #endif // !STXXL_STABLE_KSORT_HEADER
Buffered input stream.
Definition: buf_istream.h:36
Encapsulates asynchronous buffered block writing engine.
Definition: buf_writer.h:37
void stable_ksort(ExtIterator_ first, ExtIterator_ last, unsigned_type M)
Sort records with integer keys.
Definition: stable_ksort.h:224
void new_blocks(const DiskAssignFunctor &functor, BIDIteratorClass bidbegin, BIDIteratorClass bidend, unsigned_type offset=0)
Allocates new blocks.
Definition: mng.h:90
Implemented as reference counting smart pointer.
Definition: request_ptr.h:34
Buffered output stream.
Definition: buf_ostream.h:30
unsigned disks_number() const
Returns number of disks available to user.
Definition: config.h:75
virtual void wait(bool measure_time=true)=0
Suspends calling thread until completion of the request.
Block manager class.
Definition: mng.h:59
Access point to disks properties.
Definition: config.h:31