STXXL  1.4-dev
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
stable_ksort.h
Go to the documentation of this file.
1 /***************************************************************************
2  * include/stxxl/bits/algo/stable_ksort.h
3  *
4  * Part of the STXXL. See http://stxxl.sourceforge.net
5  *
6  * Copyright (C) 2003 Roman Dementiev <[email protected]>
7  * Copyright (C) 2008 Andreas Beckmann <[email protected]>
8  *
9  * Distributed under the Boost Software License, Version 1.0.
10  * (See accompanying file LICENSE_1_0.txt or copy at
11  * http://www.boost.org/LICENSE_1_0.txt)
12  **************************************************************************/
13 
14 #ifndef STXXL_ALGO_STABLE_KSORT_HEADER
15 #define STXXL_ALGO_STABLE_KSORT_HEADER
16 
17 // it is a first try: distribution sort without sampling
18 // I rework the stable_ksort when I would have a time
19 
27 
28 #ifndef STXXL_VERBOSE_STABLE_KSORT
29 #define STXXL_VERBOSE_STABLE_KSORT STXXL_VERBOSE1
30 #endif
31 
33 
34 //! \addtogroup stlalgo
35 //! \{
36 
37 /*! \internal
38  */
39 namespace stable_ksort_local {
40 
41 template <class Type, class TypeKey>
42 void classify_block(Type* begin, Type* end, TypeKey*& out,
43  int_type* bucket, typename Type::key_type offset,
44  unsigned shift)
45 {
46  for (Type* p = begin; p < end; p++, out++) // count & create references
47  {
48  out->ptr = p;
49  typename Type::key_type key = p->key();
50  int_type ibucket = (int_type)((key - offset) >> shift);
51  out->key = key;
52  bucket[ibucket]++;
53  }
54 }
55 
56 template <typename Type>
57 struct type_key
58 {
59  typedef typename Type::key_type key_type;
61  Type* ptr;
62 
63  type_key() { }
64  type_key(key_type k, Type* p) : key(k), ptr(p)
65  { }
66 };
67 
68 template <typename Type>
69 bool operator < (const type_key<Type>& a, const type_key<Type>& b)
70 {
71  return a.key < b.key;
72 }
73 
74 template <typename Type>
75 bool operator > (const type_key<Type>& a, const type_key<Type>& b)
76 {
77  return a.key > b.key;
78 }
79 
80 template <typename BIDType, typename AllocStrategy>
82 {
83 public:
84  typedef BIDType bid_type;
85  typedef bid_type& reference;
86  typedef AllocStrategy alloc_strategy;
89 
90 protected:
93 
94 public:
95  bid_sequence() : bids(NULL) { }
97  {
98  bids = new simple_vector<bid_type>(size_);
99  block_manager* mng = block_manager::get_instance();
100  mng->new_blocks(alloc_strategy_, bids->begin(), bids->end());
101  }
102  void init(size_type size_)
103  {
104  bids = new simple_vector<bid_type>(size_);
105  block_manager* mng = block_manager::get_instance();
106  mng->new_blocks(alloc_strategy_, bids->begin(), bids->end());
107  }
108  reference operator [] (size_type i)
109  {
110  size_type size_ = size(); // cache size in a register
111  if (i < size_)
112  return *(bids->begin() + i);
113 
114  block_manager* mng = block_manager::get_instance();
115  simple_vector<bid_type>* larger_bids = new simple_vector<bid_type>((i + 1) * 2);
116  std::copy(bids->begin(), bids->end(), larger_bids->begin());
117  mng->new_blocks(alloc_strategy_, larger_bids->begin() + size_, larger_bids->end());
118  delete bids;
119  bids = larger_bids;
120  return *(larger_bids->begin() + i);
121  }
122  size_type size() { return bids->size(); }
123  iterator begin() { return bids->begin(); }
125  {
126  block_manager::get_instance()->delete_blocks(bids->begin(), bids->end());
127  delete bids;
128  }
129 };
130 
131 template <typename ExtIterator>
133  bid_sequence<typename ExtIterator::vector_type::block_type::bid_type,
134  typename ExtIterator::vector_type::alloc_strategy_type>* bucket_bids,
135  int64* bucket_sizes,
136  const int_type nbuckets,
137  const int_type lognbuckets,
138  ExtIterator first,
139  ExtIterator last,
140  const int_type nread_buffers,
141  const int_type nwrite_buffers)
142 {
143  typedef typename ExtIterator::vector_type::value_type value_type;
144  typedef typename value_type::key_type key_type;
145  typedef typename ExtIterator::block_type block_type;
146  typedef typename ExtIterator::bids_container_iterator bids_container_iterator;
147 
148  typedef buf_istream<block_type, bids_container_iterator> buf_istream_type;
149 
150  int_type i = 0;
151 
152  buf_istream_type in(first.bid(), last.bid() + ((first.block_offset()) ? 1 : 0),
153  nread_buffers);
154 
156  nbuckets + nwrite_buffers,
157  nwrite_buffers);
158 
159  unsigned_type* bucket_block_offsets = new unsigned_type[nbuckets];
160  unsigned_type* bucket_iblock = new unsigned_type[nbuckets];
161  block_type** bucket_blocks = new block_type*[nbuckets];
162 
163  std::fill(bucket_sizes, bucket_sizes + nbuckets, 0);
164  std::fill(bucket_iblock, bucket_iblock + nbuckets, 0);
165  std::fill(bucket_block_offsets, bucket_block_offsets + nbuckets, 0);
166 
167  for (i = 0; i < nbuckets; i++)
168  bucket_blocks[i] = out.get_free_block();
169 
170  ExtIterator cur = first - first.block_offset();
171 
172  // skip part of the block before first untouched
173  for ( ; cur != first; cur++)
174  ++in;
175 
176  const int_type shift = sizeof(key_type) * 8 - lognbuckets;
177  // search in the the range [_begin,_end)
178  STXXL_VERBOSE_STABLE_KSORT("Shift by: " << shift << " bits, lognbuckets: " << lognbuckets);
179  for ( ; cur != last; cur++)
180  {
181  key_type cur_key = in.current().key();
182  int_type ibucket = (int_type)(cur_key >> shift);
183 
184  int_type block_offset = bucket_block_offsets[ibucket];
185  in >> (bucket_blocks[ibucket]->elem[block_offset++]);
186  if (block_offset == block_type::size)
187  {
188  block_offset = 0;
189  int_type iblock = bucket_iblock[ibucket]++;
190  bucket_blocks[ibucket] = out.write(bucket_blocks[ibucket], bucket_bids[ibucket][iblock]);
191  }
192  bucket_block_offsets[ibucket] = block_offset;
193  }
194  for (i = 0; i < nbuckets; i++)
195  {
196  if (bucket_block_offsets[i])
197  {
198  out.write(bucket_blocks[i], bucket_bids[i][bucket_iblock[i]]);
199  }
200  bucket_sizes[i] = int64(block_type::size) * bucket_iblock[i] +
201  bucket_block_offsets[i];
202  STXXL_VERBOSE_STABLE_KSORT("Bucket " << i << " has size " << bucket_sizes[i] <<
203  ", estimated size: " << ((last - first) / int64(nbuckets)));
204  }
205 
206  delete[] bucket_blocks;
207  delete[] bucket_block_offsets;
208  delete[] bucket_iblock;
209 }
210 
211 } // namespace stable_ksort_local
212 
213 //! Sort records with integer keys
214 //! \param first object of model of \c ext_random_access_iterator concept
215 //! \param last object of model of \c ext_random_access_iterator concept
216 //! \param M amount of memory for internal use (in bytes)
217 //! \remark Elements must provide a method key() which returns the integer key.
218 //! \remark Not yet fully implemented, it assumes that the keys are uniformly
219 //! distributed between [0,std::numeric_limits<key_type>::max().
220 template <typename ExtIterator>
221 void stable_ksort(ExtIterator first, ExtIterator last, unsigned_type M)
222 {
223  STXXL_MSG("Warning: stable_ksort is not yet fully implemented, it assumes that the keys are uniformly distributed between [0,std::numeric_limits<key_type>::max()]");
224  typedef typename ExtIterator::vector_type::value_type value_type;
225  typedef typename value_type::key_type key_type;
226  typedef typename ExtIterator::block_type block_type;
227  typedef typename ExtIterator::bids_container_iterator bids_container_iterator;
228  typedef typename block_type::bid_type bid_type;
229  typedef typename ExtIterator::vector_type::alloc_strategy_type alloc_strategy;
232 
233  first.flush(); // flush container
234 
235  double begin = timestamp();
236 
237  unsigned_type i = 0;
238  config* cfg = config::get_instance();
239  const unsigned_type m = M / block_type::raw_size;
240  assert(2 * block_type::raw_size <= M);
241  const unsigned_type write_buffers_multiple = 2;
242  const unsigned_type read_buffers_multiple = 2;
243  const unsigned_type ndisks = cfg->disks_number();
244  const unsigned_type min_num_read_write_buffers = (write_buffers_multiple + read_buffers_multiple) * ndisks;
245  const unsigned_type nmaxbuckets = m - min_num_read_write_buffers;
246  const unsigned int lognbuckets = ilog2_floor(nmaxbuckets);
247  const unsigned_type nbuckets = unsigned_type(1) << lognbuckets;
248  const unsigned_type est_bucket_size = (unsigned_type)div_ceil((last - first) / nbuckets, block_type::size); //in blocks
249 
250  if (m < min_num_read_write_buffers + 2 || nbuckets < 2) {
251  STXXL_ERRMSG("stxxl::stable_ksort: Not enough memory. Blocks available: " << m <<
252  ", required for r/w buffers: " << min_num_read_write_buffers <<
253  ", required for buckets: 2, nbuckets: " << nbuckets);
254  throw bad_parameter("stxxl::stable_ksort(): INSUFFICIENT MEMORY provided, please increase parameter 'M'");
255  }
256  STXXL_VERBOSE_STABLE_KSORT("Elements to sort: " << (last - first));
257  STXXL_VERBOSE_STABLE_KSORT("Number of buckets has to be reduced from " << nmaxbuckets << " to " << nbuckets);
258  const unsigned_type nread_buffers = (m - nbuckets) * read_buffers_multiple / (read_buffers_multiple + write_buffers_multiple);
259  const unsigned_type nwrite_buffers = (m - nbuckets) * write_buffers_multiple / (read_buffers_multiple + write_buffers_multiple);
260 
261  STXXL_VERBOSE_STABLE_KSORT("Read buffers in distribution phase: " << nread_buffers);
262  STXXL_VERBOSE_STABLE_KSORT("Write buffers in distribution phase: " << nwrite_buffers);
263 
264  bucket_bids_type* bucket_bids = new bucket_bids_type[nbuckets];
265  for (i = 0; i < nbuckets; ++i)
266  bucket_bids[i].init(est_bucket_size);
267 
268  int64* bucket_sizes = new int64[nbuckets];
269 
270  disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
271 
273  bucket_bids,
274  bucket_sizes,
275  nbuckets,
276  lognbuckets,
277  first,
278  last,
279  nread_buffers,
280  nwrite_buffers);
281 
282  double dist_end = timestamp(), end;
283  double io_wait_after_d = stats::get_instance()->get_io_wait_time();
284 
285  {
286  // sort buckets
287  unsigned_type write_buffers_multiple_bs = 2;
288  unsigned_type max_bucket_size_bl = (m - write_buffers_multiple_bs * ndisks) / 2; // in number of blocks
289  int64 max_bucket_size_rec = int64(max_bucket_size_bl) * block_type::size; // in number of records
290  int64 max_bucket_size_act = 0; // actual max bucket size
291  // establish output stream
292 
293  for (i = 0; i < nbuckets; i++)
294  {
295  max_bucket_size_act = STXXL_MAX(bucket_sizes[i], max_bucket_size_act);
296  if (bucket_sizes[i] > max_bucket_size_rec)
297  {
298  STXXL_ERRMSG("Bucket " << i << " is too large: " << bucket_sizes[i] <<
299  " records, maximum: " << max_bucket_size_rec);
300  STXXL_ERRMSG("Recursion on buckets is not yet implemented, aborting.");
301  abort();
302  }
303  }
304  // here we can increase write_buffers_multiple_b knowing max(bucket_sizes[i])
305  // ... and decrease max_bucket_size_bl
306  const int_type max_bucket_size_act_bl = (int_type)div_ceil(max_bucket_size_act, block_type::size);
307  STXXL_VERBOSE_STABLE_KSORT("Reducing required number of required blocks per bucket from " <<
308  max_bucket_size_bl << " to " << max_bucket_size_act_bl);
309  max_bucket_size_rec = max_bucket_size_act;
310  max_bucket_size_bl = max_bucket_size_act_bl;
311  const unsigned_type nwrite_buffers_bs = m - 2 * max_bucket_size_bl;
312  STXXL_VERBOSE_STABLE_KSORT("Write buffers in bucket sorting phase: " << nwrite_buffers_bs);
313 
314  typedef buf_ostream<block_type, bids_container_iterator> buf_ostream_type;
315  buf_ostream_type out(first.bid(), nwrite_buffers_bs);
316 
317  disk_queues::get_instance()->set_priority_op(request_queue::READ);
318 
319  if (first.block_offset())
320  {
321  // has to skip part of the first block
322  block_type* block = new block_type;
323  request_ptr req;
324  req = block->read(*first.bid());
325  req->wait();
326 
327  for (i = 0; i < first.block_offset(); i++)
328  {
329  out << block->elem[i];
330  }
331  delete block;
332  }
333  block_type* blocks1 = new block_type[max_bucket_size_bl];
334  block_type* blocks2 = new block_type[max_bucket_size_bl];
335  request_ptr* reqs1 = new request_ptr[max_bucket_size_bl];
336  request_ptr* reqs2 = new request_ptr[max_bucket_size_bl];
337  type_key_* refs1 = new type_key_[(size_t)max_bucket_size_rec];
338  type_key_* refs2 = new type_key_[(size_t)max_bucket_size_rec];
339 
340  // submit reading first 2 buckets (Peter's scheme)
341  unsigned_type nbucket_blocks = (unsigned_type)div_ceil(bucket_sizes[0], block_type::size);
342  for (i = 0; i < nbucket_blocks; i++)
343  reqs1[i] = blocks1[i].read(bucket_bids[0][i]);
344 
345  nbucket_blocks = (unsigned_type)div_ceil(bucket_sizes[1], block_type::size);
346  for (i = 0; i < nbucket_blocks; i++)
347  reqs2[i] = blocks2[i].read(bucket_bids[1][i]);
348 
349  key_type offset = 0;
350  const unsigned log_k1 = STXXL_MAX<unsigned>(ilog2_ceil(max_bucket_size_rec * sizeof(type_key_) / STXXL_L2_SIZE), 1);
351  unsigned_type k1 = unsigned_type(1) << log_k1;
352  int_type* bucket1 = new int_type[k1];
353 
354  const unsigned int shift = (unsigned int)(sizeof(key_type) * 8 - lognbuckets);
355  const unsigned int shift1 = shift - log_k1;
356 
357  STXXL_VERBOSE_STABLE_KSORT("Classifying " << nbuckets << " buckets, max size:" << max_bucket_size_rec <<
358  " block size:" << block_type::size << " log_k1:" << log_k1);
359 
360  for (unsigned_type k = 0; k < nbuckets; k++)
361  {
362  nbucket_blocks = (unsigned_type)div_ceil(bucket_sizes[k], block_type::size);
363  const unsigned log_k1_k = STXXL_MAX<unsigned>(ilog2_ceil(bucket_sizes[k] * sizeof(type_key_) / STXXL_L2_SIZE), 1);
364  assert(log_k1_k <= log_k1);
365  k1 = (unsigned_type)(1) << log_k1_k;
366  std::fill(bucket1, bucket1 + k1, 0);
367 
368  STXXL_VERBOSE_STABLE_KSORT("Classifying bucket " << k << " size:" << bucket_sizes[k] <<
369  " blocks:" << nbucket_blocks << " log_k1:" << log_k1_k);
370  // classify first nbucket_blocks-1 blocks, they are full
371  type_key_* ref_ptr = refs1;
372  key_type offset1 = offset + (key_type(1) << key_type(shift)) * key_type(k);
373  for (i = 0; i < nbucket_blocks - 1; i++)
374  {
375  reqs1[i]->wait();
376  stable_ksort_local::classify_block(blocks1[i].begin(), blocks1[i].end(),
377  ref_ptr, bucket1, offset1, shift1 /*,k1*/);
378  }
379  // last block might be non-full
380  const unsigned_type last_block_size =
381  (unsigned_type)(bucket_sizes[k] - (nbucket_blocks - 1) * block_type::size);
382  reqs1[i]->wait();
383 
384  //STXXL_MSG("block_type::size: "<<block_type::size<<" last_block_size:"<<last_block_size);
385 
386  classify_block(blocks1[i].begin(), blocks1[i].begin() + last_block_size,
387  ref_ptr, bucket1, offset1, shift1);
388 
389  exclusive_prefix_sum(bucket1, k1);
390  classify(refs1, refs1 + bucket_sizes[k], refs2, bucket1, offset1, shift1);
391 
392  type_key_* c = refs2;
393  type_key_* d = refs1;
394  for (i = 0; i < k1; i++)
395  {
396  type_key_* cEnd = refs2 + bucket1[i];
397  type_key_* dEnd = refs1 + bucket1[i];
398 
399  const unsigned log_k2 = ilog2_floor(bucket1[i]) - 1; // adaptive bucket size
400  const unsigned_type k2 = unsigned_type(1) << log_k2;
401  int_type* bucket2 = new int_type[k2];
402  const unsigned shift2 = shift1 - log_k2;
403 
404  // STXXL_MSG("Sorting bucket "<<k<<":"<<i);
405  l1sort(c, cEnd, d, bucket2, k2,
406  offset1 + (key_type(1) << key_type(shift1)) * key_type(i),
407  shift2);
408 
409  // write out all
410  for (type_key_* p = d; p < dEnd; p++)
411  out << (*(p->ptr));
412 
413  delete[] bucket2;
414  c = cEnd;
415  d = dEnd;
416  }
417  // submit next read
418  const unsigned_type bucket2submit = k + 2;
419  if (bucket2submit < nbuckets)
420  {
421  nbucket_blocks = (unsigned_type)div_ceil(bucket_sizes[bucket2submit], block_type::size);
422  for (i = 0; i < nbucket_blocks; i++)
423  reqs1[i] = blocks1[i].read(bucket_bids[bucket2submit][i]);
424  }
425 
426  std::swap(blocks1, blocks2);
427  std::swap(reqs1, reqs2);
428  }
429 
430  delete[] bucket1;
431  delete[] refs1;
432  delete[] refs2;
433  delete[] blocks1;
434  delete[] blocks2;
435  delete[] reqs1;
436  delete[] reqs2;
437  delete[] bucket_bids;
438  delete[] bucket_sizes;
439 
440  if (last.block_offset())
441  {
442  // has to skip part of the first block
443  block_type* block = new block_type;
444  request_ptr req = block->read(*last.bid());
445  req->wait();
446 
447  for (i = last.block_offset(); i < block_type::size; i++)
448  {
449  out << block->elem[i];
450  }
451  delete block;
452  }
453 
454  end = timestamp();
455  }
456 
457  STXXL_VERBOSE("Elapsed time : " << end - begin << " s. Distribution time: " <<
458  dist_end - begin << " s");
459  STXXL_VERBOSE("Time in I/O wait(ds): " << io_wait_after_d << " s");
460  STXXL_VERBOSE(*stats::get_instance());
461 }
462 
463 //! \}
464 
466 
467 #endif // !STXXL_ALGO_STABLE_KSORT_HEADER
#define STXXL_VERBOSE(x)
Definition: verbose.h:116
compat::remove_const< Integral >::type div_ceil(Integral n, Integral2 d)
Definition: utils.h:199
#define STXXL_VERBOSE_STABLE_KSORT
Definition: stable_ksort.h:29
Block manager class.
Definition: block_manager.h:61
long long int int64
Definition: types.h:38
void distribute(bid_sequence< typename ExtIterator::vector_type::block_type::bid_type, typename ExtIterator::vector_type::alloc_strategy_type > *bucket_bids, int64 *bucket_sizes, const int_type nbuckets, const int_type lognbuckets, ExtIterator first, ExtIterator last, const int_type nread_buffers, const int_type nwrite_buffers)
Definition: stable_ksort.h:132
simple_vector< bid_type > * bids
Definition: stable_ksort.h:91
High-performance smart pointer used as a wrapping reference counting pointer.
Definition: counting_ptr.h:50
#define STXXL_L2_SIZE
Definition: sort_base.h:34
Buffered output stream.
Definition: buf_ostream.h:29
iterator end()
return mutable iterator beyond last element
static void classify(TypeKey *a, TypeKey *aEnd, TypeKey *b, int_type *bucket, typename TypeKey::key_type offset, unsigned shift)
Definition: intksort.h:62
void classify_block(Type *begin, Type *end, TypeKey *&out, int_type *bucket, typename KeyExtractor::key_type offset, unsigned shift, KeyExtractor keyobj)
Definition: intksort.h:329
choose_int_types< my_pointer_size >::int_type int_type
Definition: types.h:63
bool operator>(const uint_pair &b) const
greater comparison operator
Definition: uint_types.h:216
iterator begin()
return mutable iterator to first element
Definition: simple_vector.h:94
size_t disks_number()
Returns number of disks available to user.
Definition: config.h:203
const Type & STXXL_MAX(const Type &a, const Type &b)
Definition: utils.h:153
#define STXXL_BEGIN_NAMESPACE
Definition: namespace.h:16
simple_vector< bid_type >::size_type size_type
Definition: stable_ksort.h:87
Buffered input stream.
Definition: buf_istream.h:34
void l1sort(TypeKey *a, TypeKey *aEnd, TypeKey *b, int_type *bucket, int_type K, typename TypeKey::key_type offset, int shift)
Definition: intksort.h:317
unsigned int ilog2_floor(IntegerType i)
calculate the log2 floor of an integer type (by repeated bit shifts)
Definition: utils.h:178
static void exclusive_prefix_sum(int_type *bucket, int_type K)
Definition: intksort.h:48
void new_blocks(const DiskAssignFunctor &functor, BIDIteratorClass bidbegin, BIDIteratorClass bidend, unsigned_type offset=0)
Allocates new blocks.
#define STXXL_ERRMSG(x)
Definition: verbose.h:94
void stable_ksort(ExtIterator first, ExtIterator last, unsigned_type M)
Sort records with integer keys.
Definition: stable_ksort.h:221
Simpler non-growing vector without initialization.
Definition: simple_vector.h:39
choose_int_types< my_pointer_size >::unsigned_type unsigned_type
Definition: types.h:64
unsigned int ilog2_ceil(const IntegerType &i)
calculate the log2 ceiling of an integer type (by repeated bit shifts)
Definition: utils.h:188
Access point to disks properties. Since 1.4.0: no config files are read automatically! ...
Definition: config.h:112
double timestamp()
Returns number of seconds since the epoch, high resolution.
Definition: timer.h:44
#define STXXL_MSG(x)
Definition: verbose.h:73
simple_vector< bid_type >::iterator iterator
Definition: stable_ksort.h:88
#define STXXL_END_NAMESPACE
Definition: namespace.h:17