14 #ifndef STXXL_ALGO_STABLE_KSORT_HEADER
15 #define STXXL_ALGO_STABLE_KSORT_HEADER
28 #ifndef STXXL_VERBOSE_STABLE_KSORT
29 #define STXXL_VERBOSE_STABLE_KSORT STXXL_VERBOSE1
39 namespace stable_ksort_local {
41 template <
class Type,
class TypeKey>
43 int_type* bucket,
typename Type::key_type offset,
46 for (Type* p = begin; p < end; p++, out++)
49 typename Type::key_type key = p->key();
56 template <
typename Type>
68 template <
typename Type>
74 template <
typename Type>
80 template <
typename BIDType,
typename AllocStrategy>
100 mng->new_blocks(alloc_strategy_, bids->begin(), bids->end());
106 mng->new_blocks(alloc_strategy_, bids->begin(), bids->end());
112 return *(bids->begin() + i);
116 std::copy(bids->begin(), bids->end(), larger_bids->
begin());
120 return *(larger_bids->
begin() + i);
126 block_manager::get_instance()->delete_blocks(bids->begin(), bids->end());
131 template <
typename ExtIterator>
133 bid_sequence<
typename ExtIterator::vector_type::block_type::bid_type,
134 typename ExtIterator::vector_type::alloc_strategy_type>* bucket_bids,
143 typedef typename ExtIterator::vector_type::value_type value_type;
144 typedef typename value_type::key_type key_type;
145 typedef typename ExtIterator::block_type block_type;
146 typedef typename ExtIterator::bids_container_iterator bids_container_iterator;
152 buf_istream_type in(first.bid(), last.bid() + ((first.block_offset()) ? 1 : 0),
156 nbuckets + nwrite_buffers,
161 block_type** bucket_blocks =
new block_type*[nbuckets];
163 std::fill(bucket_sizes, bucket_sizes + nbuckets, 0);
164 std::fill(bucket_iblock, bucket_iblock + nbuckets, 0);
165 std::fill(bucket_block_offsets, bucket_block_offsets + nbuckets, 0);
167 for (i = 0; i < nbuckets; i++)
168 bucket_blocks[i] = out.get_free_block();
170 ExtIterator cur = first - first.block_offset();
173 for ( ; cur != first; cur++)
176 const int_type shift =
sizeof(key_type) * 8 - lognbuckets;
179 for ( ; cur != last; cur++)
181 key_type cur_key = in.current().key();
184 int_type block_offset = bucket_block_offsets[ibucket];
185 in >> (bucket_blocks[ibucket]->elem[block_offset++]);
186 if (block_offset == block_type::size)
189 int_type iblock = bucket_iblock[ibucket]++;
190 bucket_blocks[ibucket] = out.write(bucket_blocks[ibucket], bucket_bids[ibucket][iblock]);
192 bucket_block_offsets[ibucket] = block_offset;
194 for (i = 0; i < nbuckets; i++)
196 if (bucket_block_offsets[i])
198 out.write(bucket_blocks[i], bucket_bids[i][bucket_iblock[i]]);
200 bucket_sizes[i] =
int64(block_type::size) * bucket_iblock[i] +
201 bucket_block_offsets[i];
203 ", estimated size: " << ((last - first) /
int64(nbuckets)));
206 delete[] bucket_blocks;
207 delete[] bucket_block_offsets;
208 delete[] bucket_iblock;
220 template <
typename ExtIterator>
223 STXXL_MSG(
"Warning: stable_ksort is not yet fully implemented, it assumes that the keys are uniformly distributed between [0,std::numeric_limits<key_type>::max()]");
224 typedef typename ExtIterator::vector_type::value_type value_type;
225 typedef typename value_type::key_type key_type;
226 typedef typename ExtIterator::block_type block_type;
227 typedef typename ExtIterator::bids_container_iterator bids_container_iterator;
228 typedef typename block_type::bid_type bid_type;
229 typedef typename ExtIterator::vector_type::alloc_strategy_type alloc_strategy;
238 config* cfg = config::get_instance();
240 assert(2 * block_type::raw_size <= M);
244 const unsigned_type min_num_read_write_buffers = (write_buffers_multiple + read_buffers_multiple) * ndisks;
245 const unsigned_type nmaxbuckets = m - min_num_read_write_buffers;
246 const unsigned int lognbuckets =
ilog2_floor(nmaxbuckets);
250 if (m < min_num_read_write_buffers + 2 || nbuckets < 2) {
251 STXXL_ERRMSG(
"stxxl::stable_ksort: Not enough memory. Blocks available: " << m <<
252 ", required for r/w buffers: " << min_num_read_write_buffers <<
253 ", required for buckets: 2, nbuckets: " << nbuckets);
254 throw bad_parameter(
"stxxl::stable_ksort(): INSUFFICIENT MEMORY provided, please increase parameter 'M'");
258 const unsigned_type nread_buffers = (m - nbuckets) * read_buffers_multiple / (read_buffers_multiple + write_buffers_multiple);
259 const unsigned_type nwrite_buffers = (m - nbuckets) * write_buffers_multiple / (read_buffers_multiple + write_buffers_multiple);
264 bucket_bids_type* bucket_bids =
new bucket_bids_type[nbuckets];
265 for (i = 0; i < nbuckets; ++i)
266 bucket_bids[i].init(est_bucket_size);
270 disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
283 double io_wait_after_d = stats::get_instance()->get_io_wait_time();
288 unsigned_type max_bucket_size_bl = (m - write_buffers_multiple_bs * ndisks) / 2;
289 int64 max_bucket_size_rec =
int64(max_bucket_size_bl) * block_type::size;
290 int64 max_bucket_size_act = 0;
293 for (i = 0; i < nbuckets; i++)
295 max_bucket_size_act =
STXXL_MAX(bucket_sizes[i], max_bucket_size_act);
296 if (bucket_sizes[i] > max_bucket_size_rec)
298 STXXL_ERRMSG(
"Bucket " << i <<
" is too large: " << bucket_sizes[i] <<
299 " records, maximum: " << max_bucket_size_rec);
300 STXXL_ERRMSG(
"Recursion on buckets is not yet implemented, aborting.");
308 max_bucket_size_bl <<
" to " << max_bucket_size_act_bl);
309 max_bucket_size_rec = max_bucket_size_act;
310 max_bucket_size_bl = max_bucket_size_act_bl;
311 const unsigned_type nwrite_buffers_bs = m - 2 * max_bucket_size_bl;
315 buf_ostream_type out(first.bid(), nwrite_buffers_bs);
317 disk_queues::get_instance()->set_priority_op(request_queue::READ);
319 if (first.block_offset())
322 block_type* block =
new block_type;
324 req = block->read(*first.bid());
327 for (i = 0; i < first.block_offset(); i++)
329 out << block->elem[i];
333 block_type* blocks1 =
new block_type[max_bucket_size_bl];
334 block_type* blocks2 =
new block_type[max_bucket_size_bl];
337 type_key_* refs1 =
new type_key_[(size_t)max_bucket_size_rec];
338 type_key_* refs2 =
new type_key_[(size_t)max_bucket_size_rec];
342 for (i = 0; i < nbucket_blocks; i++)
343 reqs1[i] = blocks1[i].read(bucket_bids[0][i]);
346 for (i = 0; i < nbucket_blocks; i++)
347 reqs2[i] = blocks2[i].read(bucket_bids[1][i]);
350 const unsigned log_k1 = STXXL_MAX<unsigned>(
ilog2_ceil(max_bucket_size_rec *
sizeof(type_key_) /
STXXL_L2_SIZE), 1);
354 const unsigned int shift = (
unsigned int)(
sizeof(key_type) * 8 - lognbuckets);
355 const unsigned int shift1 = shift - log_k1;
358 " block size:" << block_type::size <<
" log_k1:" << log_k1);
363 const unsigned log_k1_k = STXXL_MAX<unsigned>(
ilog2_ceil(bucket_sizes[k] *
sizeof(type_key_) /
STXXL_L2_SIZE), 1);
364 assert(log_k1_k <= log_k1);
366 std::fill(bucket1, bucket1 + k1, 0);
369 " blocks:" << nbucket_blocks <<
" log_k1:" << log_k1_k);
371 type_key_* ref_ptr = refs1;
372 key_type offset1 = offset + (key_type(1) << key_type(shift)) * key_type(k);
373 for (i = 0; i < nbucket_blocks - 1; i++)
377 ref_ptr, bucket1, offset1, shift1 );
381 (
unsigned_type)(bucket_sizes[k] - (nbucket_blocks - 1) * block_type::size);
386 classify_block(blocks1[i].begin(), blocks1[i].begin() + last_block_size,
387 ref_ptr, bucket1, offset1, shift1);
390 classify(refs1, refs1 + bucket_sizes[k], refs2, bucket1, offset1, shift1);
392 type_key_* c = refs2;
393 type_key_* d = refs1;
394 for (i = 0; i < k1; i++)
396 type_key_* cEnd = refs2 + bucket1[i];
397 type_key_* dEnd = refs1 + bucket1[i];
399 const unsigned log_k2 =
ilog2_floor(bucket1[i]) - 1;
402 const unsigned shift2 = shift1 - log_k2;
405 l1sort(c, cEnd, d, bucket2, k2,
406 offset1 + (key_type(1) << key_type(shift1)) * key_type(i),
410 for (type_key_* p = d; p < dEnd; p++)
419 if (bucket2submit < nbuckets)
422 for (i = 0; i < nbucket_blocks; i++)
423 reqs1[i] = blocks1[i].read(bucket_bids[bucket2submit][i]);
426 std::swap(blocks1, blocks2);
427 std::swap(reqs1, reqs2);
437 delete[] bucket_bids;
438 delete[] bucket_sizes;
440 if (last.block_offset())
443 block_type* block =
new block_type;
447 for (i = last.block_offset(); i < block_type::size; i++)
449 out << block->elem[i];
457 STXXL_VERBOSE(
"Elapsed time : " << end - begin <<
" s. Distribution time: " <<
458 dist_end - begin <<
" s");
459 STXXL_VERBOSE(
"Time in I/O wait(ds): " << io_wait_after_d <<
" s");
467 #endif // !STXXL_ALGO_STABLE_KSORT_HEADER
void init(size_type size_)
compat::remove_const< Integral >::type div_ceil(Integral n, Integral2 d)
#define STXXL_VERBOSE_STABLE_KSORT
void distribute(bid_sequence< typename ExtIterator::vector_type::block_type::bid_type, typename ExtIterator::vector_type::alloc_strategy_type > *bucket_bids, int64 *bucket_sizes, const int_type nbuckets, const int_type lognbuckets, ExtIterator first, ExtIterator last, const int_type nread_buffers, const int_type nwrite_buffers)
simple_vector< bid_type > * bids
High-performance smart pointer used as a wrapping reference counting pointer.
iterator end()
return mutable iterator beyond last element
static void classify(TypeKey *a, TypeKey *aEnd, TypeKey *b, int_type *bucket, typename TypeKey::key_type offset, unsigned shift)
void classify_block(Type *begin, Type *end, TypeKey *&out, int_type *bucket, typename KeyExtractor::key_type offset, unsigned shift, KeyExtractor keyobj)
choose_int_types< my_pointer_size >::int_type int_type
bool operator>(const uint_pair &b) const
greater comparison operator
iterator begin()
return mutable iterator to first element
size_t disks_number()
Returns number of disks available to user.
const Type & STXXL_MAX(const Type &a, const Type &b)
#define STXXL_BEGIN_NAMESPACE
simple_vector< bid_type >::size_type size_type
void l1sort(TypeKey *a, TypeKey *aEnd, TypeKey *b, int_type *bucket, int_type K, typename TypeKey::key_type offset, int shift)
unsigned int ilog2_floor(IntegerType i)
calculate the log2 floor of an integer type (by repeated bit shifts)
static void exclusive_prefix_sum(int_type *bucket, int_type K)
void new_blocks(const DiskAssignFunctor &functor, BIDIteratorClass bidbegin, BIDIteratorClass bidend, unsigned_type offset=0)
Allocates new blocks.
void stable_ksort(ExtIterator first, ExtIterator last, unsigned_type M)
Sort records with integer keys.
Simpler non-growing vector without initialization.
choose_int_types< my_pointer_size >::unsigned_type unsigned_type
unsigned int ilog2_ceil(const IntegerType &i)
calculate the log2 ceiling of an integer type (by repeated bit shifts)
Access point to disks properties. Since 1.4.0: no config files are read automatically! ...
double timestamp()
Returns number of seconds since the epoch, high resolution.
alloc_strategy alloc_strategy_
simple_vector< bid_type >::iterator iterator
type_key(key_type k, Type *p)
bid_sequence(size_type size_)
#define STXXL_END_NAMESPACE
AllocStrategy alloc_strategy