00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014 #ifndef STXXL_STABLE_KSORT_HEADER
00015 #define STXXL_STABLE_KSORT_HEADER
00016
00017
00018
00019
00020
00021 #include <stxxl/bits/mng/mng.h>
00022 #include <stxxl/bits/mng/buf_istream.h>
00023 #include <stxxl/bits/mng/buf_ostream.h>
00024 #include <stxxl/bits/common/simple_vector.h>
00025 #include <stxxl/bits/algo/intksort.h>
00026 #include <stxxl/bits/algo/sort_base.h>
00027 #include <stxxl/bits/common/utils.h>
00028
00029
00030 #ifndef STXXL_VERBOSE_STABLE_KSORT
00031 #define STXXL_VERBOSE_STABLE_KSORT STXXL_VERBOSE1
00032 #endif
00033
00034
00035 __STXXL_BEGIN_NAMESPACE
00036
00039
00042 namespace stable_ksort_local
00043 {
00044 template <class type_, class type_key>
00045 void classify_block(type_ * begin, type_ * end, type_key * & out, int_type * bucket, unsigned_type offset, unsigned shift)
00046 {
00047 for (type_ * p = begin; p < end; p++, out++)
00048 {
00049 out->ptr = p;
00050 typename type_::key_type key = p->key();
00051 int_type ibucket = (key - offset) >> shift;
00052 out->key = key;
00053 bucket[ibucket]++;
00054 }
00055 }
00056
00057 template <typename type>
00058 struct type_key
00059 {
00060 typedef typename type::key_type key_type;
00061 key_type key;
00062 type * ptr;
00063
00064 type_key() { }
00065 type_key(key_type k, type * p) : key(k), ptr(p)
00066 { }
00067 };
00068
00069 template <typename type>
00070 bool operator < (const type_key<type> & a, const type_key<type> & b)
00071 {
00072 return a.key < b.key;
00073 }
00074
00075 template <typename type>
00076 bool operator > (const type_key<type> & a, const type_key<type> & b)
00077 {
00078 return a.key > b.key;
00079 }
00080
00081
00082 template <typename BIDType_, typename AllocStrategy_>
00083 class bid_sequence
00084 {
00085 public:
00086 typedef BIDType_ bid_type;
00087 typedef bid_type & reference;
00088 typedef AllocStrategy_ alloc_strategy;
00089 typedef typename simple_vector<bid_type>::size_type size_type;
00090 typedef typename simple_vector<bid_type>::iterator iterator;
00091
00092 protected:
00093 simple_vector<bid_type> * bids;
00094 alloc_strategy alloc_strategy_;
00095
00096 public:
00097 bid_sequence() { }
00098 bid_sequence(size_type size_)
00099 {
00100 bids = new simple_vector<bid_type>(size_);
00101 block_manager * mng = block_manager::get_instance();
00102 mng->new_blocks(alloc_strategy_, bids->begin(), bids->end());
00103 }
00104 void init(size_type size_)
00105 {
00106 bids = new simple_vector<bid_type>(size_);
00107 block_manager * mng = block_manager::get_instance();
00108 mng->new_blocks(alloc_strategy_, bids->begin(), bids->end());
00109 }
00110 reference operator [] (size_type i)
00111 {
00112 size_type size_ = size();
00113 if (i < size_)
00114 return *(bids->begin() + i);
00115
00116 block_manager * mng = block_manager::get_instance();
00117 simple_vector<bid_type> * larger_bids = new simple_vector<bid_type>((i + 1) * 2);
00118 std::copy(bids->begin(), bids->end(), larger_bids->begin());
00119 mng->new_blocks(alloc_strategy_, larger_bids->begin() + size_, larger_bids->end());
00120 delete bids;
00121 bids = larger_bids;
00122 return *(larger_bids->begin() + i);
00123 }
00124 size_type size() { return bids->size(); }
00125 iterator begin() { return bids->begin(); }
00126 ~bid_sequence()
00127 {
00128 block_manager::get_instance()->delete_blocks(bids->begin(), bids->end());
00129 delete bids;
00130 }
00131 };
00132
00133 template <typename ExtIterator_>
00134 void distribute(
00135 bid_sequence<typename ExtIterator_::vector_type::block_type::bid_type,
00136 typename ExtIterator_::vector_type::alloc_strategy_type> * bucket_bids,
00137 int64 * bucket_sizes,
00138 const int_type nbuckets,
00139 const int_type lognbuckets,
00140 ExtIterator_ first,
00141 ExtIterator_ last,
00142 const int_type nread_buffers,
00143 const int_type nwrite_buffers)
00144 {
00145 typedef typename ExtIterator_::vector_type::value_type value_type;
00146 typedef typename value_type::key_type key_type;
00147 typedef typename ExtIterator_::block_type block_type;
00148 typedef typename block_type::bid_type bid_type;
00149 typedef buf_istream<typename ExtIterator_::block_type,
00150 typename ExtIterator_::bids_container_iterator> buf_istream_type;
00151
00152 int_type i = 0;
00153
00154 buf_istream_type in(first.bid(), last.bid() + ((first.block_offset()) ? 1 : 0),
00155 nread_buffers);
00156
00157 buffered_writer<block_type> out(
00158 nbuckets + nwrite_buffers,
00159 nwrite_buffers);
00160
00161 unsigned_type * bucket_block_offsets = new unsigned_type[nbuckets];
00162 unsigned_type * bucket_iblock = new unsigned_type[nbuckets];
00163 block_type ** bucket_blocks = new block_type *[nbuckets];
00164
00165 std::fill(bucket_sizes, bucket_sizes + nbuckets, 0);
00166 std::fill(bucket_iblock, bucket_iblock + nbuckets, 0);
00167 std::fill(bucket_block_offsets, bucket_block_offsets + nbuckets, 0);
00168
00169 for (i = 0; i < nbuckets; i++)
00170 bucket_blocks[i] = out.get_free_block();
00171
00172
00173 ExtIterator_ cur = first - first.block_offset();
00174
00175
00176 for ( ; cur != first; cur++)
00177 ++in;
00178
00179
00180 const int_type shift = sizeof(key_type) * 8 - lognbuckets;
00181
00182 STXXL_VERBOSE_STABLE_KSORT("Shift by: " << shift << " bits, lognbuckets: " << lognbuckets);
00183 for ( ; cur != last; cur++)
00184 {
00185 key_type cur_key = in.current().key();
00186 int_type ibucket = cur_key >> shift;
00187
00188 int_type block_offset = bucket_block_offsets[ibucket];
00189 in >> (bucket_blocks[ibucket]->elem[block_offset++]);
00190 if (block_offset == block_type::size)
00191 {
00192 block_offset = 0;
00193 int_type iblock = bucket_iblock[ibucket]++;
00194 bucket_blocks[ibucket] = out.write(bucket_blocks[ibucket], bucket_bids[ibucket][iblock]);
00195 }
00196 bucket_block_offsets[ibucket] = block_offset;
00197 }
00198 for (i = 0; i < nbuckets; i++)
00199 {
00200 if (bucket_block_offsets[i])
00201 {
00202 out.write(bucket_blocks[i], bucket_bids[i][bucket_iblock[i]]);
00203 }
00204 bucket_sizes[i] = int64(block_type::size) * bucket_iblock[i] +
00205 bucket_block_offsets[i];
00206 STXXL_VERBOSE_STABLE_KSORT("Bucket " << i << " has size " << bucket_sizes[i] <<
00207 ", estimated size: " << ((last - first) / int64(nbuckets)));
00208 }
00209
00210 delete[] bucket_blocks;
00211 delete[] bucket_block_offsets;
00212 delete[] bucket_iblock;
00213 }
00214 }
00215
00223 template <typename ExtIterator_>
00224 void stable_ksort(ExtIterator_ first, ExtIterator_ last, unsigned_type M)
00225 {
00226 STXXL_MSG("Warning: stable_ksort is not yet fully implemented, it assumes that the keys are uniformly distributed between [0,(std::numeric_limits<key_type>::max)()]");
00227 typedef typename ExtIterator_::vector_type::value_type value_type;
00228 typedef typename value_type::key_type key_type;
00229 typedef typename ExtIterator_::block_type block_type;
00230 typedef typename block_type::bid_type bid_type;
00231 typedef typename ExtIterator_::vector_type::alloc_strategy_type alloc_strategy;
00232 typedef stable_ksort_local::bid_sequence<bid_type, alloc_strategy> bucket_bids_type;
00233 typedef stable_ksort_local::type_key<value_type> type_key_;
00234
00235 first.flush();
00236
00237 double begin = timestamp();
00238
00239 unsigned_type i = 0;
00240 config * cfg = config::get_instance();
00241 const unsigned_type m = M / block_type::raw_size;
00242 assert(2 * block_type::raw_size <= M);
00243 const unsigned_type write_buffers_multiple = 2;
00244 const unsigned_type read_buffers_multiple = 2;
00245 const unsigned_type ndisks = cfg->disks_number();
00246 const unsigned_type min_num_read_write_buffers = (write_buffers_multiple + read_buffers_multiple) * ndisks;
00247 const unsigned_type nmaxbuckets = m - min_num_read_write_buffers;
00248 const unsigned_type lognbuckets = log2_floor(nmaxbuckets);
00249 const unsigned_type nbuckets = 1 << lognbuckets;
00250 const unsigned_type est_bucket_size = div_ceil((last - first) / nbuckets, block_type::size);
00251
00252 if (m < min_num_read_write_buffers + 2 || nbuckets < 2) {
00253 STXXL_ERRMSG("stxxl::stable_ksort: Not enough memory. Blocks available: " << m <<
00254 ", required for r/w buffers: " << min_num_read_write_buffers <<
00255 ", required for buckets: 2, nbuckets: " << nbuckets);
00256 throw bad_parameter("stxxl::stable_ksort(): INSUFFICIENT MEMORY provided, please increase parameter 'M'");
00257 }
00258 STXXL_VERBOSE_STABLE_KSORT("Elements to sort: " << (last - first));
00259 STXXL_VERBOSE_STABLE_KSORT("Number of buckets has to be reduced from " << nmaxbuckets << " to " << nbuckets);
00260 const unsigned_type nread_buffers = (m - nbuckets) * read_buffers_multiple / (read_buffers_multiple + write_buffers_multiple);
00261 const unsigned_type nwrite_buffers = (m - nbuckets) * write_buffers_multiple / (read_buffers_multiple + write_buffers_multiple);
00262
00263 STXXL_VERBOSE_STABLE_KSORT("Read buffers in distribution phase: " << nread_buffers);
00264 STXXL_VERBOSE_STABLE_KSORT("Write buffers in distribution phase: " << nwrite_buffers);
00265
00266 bucket_bids_type * bucket_bids = new bucket_bids_type[nbuckets];
00267 for (i = 0; i < nbuckets; ++i)
00268 bucket_bids[i].init(est_bucket_size);
00269
00270 int64 * bucket_sizes = new int64[nbuckets];
00271
00272 disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
00273
00274 stable_ksort_local::distribute(
00275 bucket_bids,
00276 bucket_sizes,
00277 nbuckets,
00278 lognbuckets,
00279 first,
00280 last,
00281 nread_buffers,
00282 nwrite_buffers);
00283
00284 double dist_end = timestamp(), end;
00285 double io_wait_after_d = stats::get_instance()->get_io_wait_time();
00286
00287 {
00288
00289 unsigned_type write_buffers_multiple_bs = 2;
00290 unsigned_type max_bucket_size_bl = (m - write_buffers_multiple_bs * ndisks) / 2;
00291 int64 max_bucket_size_rec = int64(max_bucket_size_bl) * block_type::size;
00292 int64 max_bucket_size_act = 0;
00293
00294
00295 for (i = 0; i < nbuckets; i++)
00296 {
00297 max_bucket_size_act = STXXL_MAX(bucket_sizes[i], max_bucket_size_act);
00298 if (bucket_sizes[i] > max_bucket_size_rec)
00299 {
00300 STXXL_ERRMSG("Bucket " << i << " is too large: " << bucket_sizes[i] <<
00301 " records, maximum: " << max_bucket_size_rec);
00302 STXXL_ERRMSG("Recursion on buckets is not yet implemented, aborting.");
00303 abort();
00304 }
00305 }
00306
00307
00308 const int_type max_bucket_size_act_bl = div_ceil(max_bucket_size_act, block_type::size);
00309 STXXL_VERBOSE_STABLE_KSORT("Reducing required number of required blocks per bucket from " <<
00310 max_bucket_size_bl << " to " << max_bucket_size_act_bl);
00311 max_bucket_size_rec = max_bucket_size_act;
00312 max_bucket_size_bl = max_bucket_size_act_bl;
00313 const unsigned_type nwrite_buffers_bs = m - 2 * max_bucket_size_bl;
00314 STXXL_VERBOSE_STABLE_KSORT("Write buffers in bucket sorting phase: " << nwrite_buffers_bs);
00315
00316 typedef buf_ostream<block_type, typename ExtIterator_::bids_container_iterator> buf_ostream_type;
00317 buf_ostream_type out(first.bid(), nwrite_buffers_bs);
00318
00319 disk_queues::get_instance()->set_priority_op(request_queue::READ);
00320
00321 if (first.block_offset())
00322 {
00323
00324 block_type * block = new block_type;
00325 request_ptr req;
00326 req = block->read(*first.bid());
00327 req->wait();
00328
00329 for (i = 0; i < first.block_offset(); i++)
00330 {
00331 out << block->elem[i];
00332 }
00333 delete block;
00334 }
00335 block_type * blocks1 = new block_type[max_bucket_size_bl];
00336 block_type * blocks2 = new block_type[max_bucket_size_bl];
00337 request_ptr * reqs1 = new request_ptr[max_bucket_size_bl];
00338 request_ptr * reqs2 = new request_ptr[max_bucket_size_bl];
00339 type_key_ * refs1 = new type_key_[max_bucket_size_rec];
00340 type_key_ * refs2 = new type_key_[max_bucket_size_rec];
00341
00342
00343 unsigned_type nbucket_blocks = div_ceil(bucket_sizes[0], block_type::size);
00344 for (i = 0; i < nbucket_blocks; i++)
00345 reqs1[i] = blocks1[i].read(bucket_bids[0][i]);
00346
00347
00348 nbucket_blocks = div_ceil(bucket_sizes[1], block_type::size);
00349 for (i = 0; i < nbucket_blocks; i++)
00350 reqs2[i] = blocks2[i].read(bucket_bids[1][i]);
00351
00352
00353 key_type offset = 0;
00354 const unsigned log_k1 = STXXL_MAX<unsigned>(log2_ceil(max_bucket_size_rec * sizeof(type_key_) / STXXL_L2_SIZE), 1);
00355 unsigned_type k1 = 1 << log_k1;
00356 int_type * bucket1 = new int_type[k1];
00357
00358 const unsigned shift = sizeof(key_type) * 8 - lognbuckets;
00359 const unsigned shift1 = shift - log_k1;
00360
00361 STXXL_VERBOSE_STABLE_KSORT("Classifying " << nbuckets << " buckets, max size:" << max_bucket_size_rec <<
00362 " block size:" << block_type::size << " log_k1:" << log_k1);
00363
00364 for (unsigned_type k = 0; k < nbuckets; k++)
00365 {
00366 nbucket_blocks = div_ceil(bucket_sizes[k], block_type::size);
00367 const unsigned log_k1_k = STXXL_MAX<unsigned>(log2_ceil(bucket_sizes[k] * sizeof(type_key_) / STXXL_L2_SIZE), 1);
00368 assert(log_k1_k <= log_k1);
00369 k1 = 1 << log_k1_k;
00370 std::fill(bucket1, bucket1 + k1, 0);
00371
00372 STXXL_VERBOSE_STABLE_KSORT("Classifying bucket " << k << " size:" << bucket_sizes[k] <<
00373 " blocks:" << nbucket_blocks << " log_k1:" << log_k1_k);
00374
00375 type_key_ * ref_ptr = refs1;
00376 key_type offset1 = offset + (key_type(1) << key_type(shift)) * key_type(k);
00377 for (i = 0; i < nbucket_blocks - 1; i++)
00378 {
00379 reqs1[i]->wait();
00380 stable_ksort_local::classify_block(blocks1[i].begin(), blocks1[i].end(), ref_ptr, bucket1, offset1, shift1 );
00381 }
00382
00383 const unsigned_type last_block_size = bucket_sizes[k] - int64(nbucket_blocks - 1) * block_type::size;
00384 reqs1[i]->wait();
00385
00386
00387
00388 classify_block(blocks1[i].begin(), blocks1[i].begin() + last_block_size, ref_ptr, bucket1, offset1, shift1);
00389
00390 exclusive_prefix_sum(bucket1, k1);
00391 classify(refs1, refs1 + bucket_sizes[k], refs2, bucket1, offset1, shift1);
00392
00393 type_key_ * c = refs2;
00394 type_key_ * d = refs1;
00395 for (i = 0; i < k1; i++)
00396 {
00397 type_key_ * cEnd = refs2 + bucket1[i];
00398 type_key_ * dEnd = refs1 + bucket1[i];
00399
00400 const unsigned log_k2 = log2_floor(bucket1[i]) - 1;
00401 const unsigned_type k2 = 1 << log_k2;
00402 int_type * bucket2 = new int_type[k2];
00403 const unsigned shift2 = shift1 - log_k2;
00404
00405
00406 l1sort(c, cEnd, d, bucket2, k2,
00407 offset1 + (key_type(1) << key_type(shift1)) * key_type(i),
00408 shift2);
00409
00410
00411 for (type_key_ * p = d; p < dEnd; p++)
00412 out << (*(p->ptr));
00413
00414
00415 delete[] bucket2;
00416 c = cEnd;
00417 d = dEnd;
00418 }
00419
00420 const unsigned_type bucket2submit = k + 2;
00421 if (bucket2submit < nbuckets)
00422 {
00423 nbucket_blocks = div_ceil(bucket_sizes[bucket2submit], block_type::size);
00424 for (i = 0; i < nbucket_blocks; i++)
00425 reqs1[i] = blocks1[i].read(bucket_bids[bucket2submit][i]);
00426 }
00427
00428 std::swap(blocks1, blocks2);
00429 std::swap(reqs1, reqs2);
00430 }
00431
00432 delete[] bucket1;
00433 delete[] refs1;
00434 delete[] refs2;
00435 delete[] blocks1;
00436 delete[] blocks2;
00437 delete[] reqs1;
00438 delete[] reqs2;
00439 delete[] bucket_bids;
00440 delete[] bucket_sizes;
00441
00442 if (last.block_offset())
00443 {
00444
00445 block_type * block = new block_type;
00446 request_ptr req = block->read(*last.bid());
00447 req->wait();
00448
00449 for (i = last.block_offset(); i < block_type::size; i++)
00450 {
00451 out << block->elem[i];
00452 }
00453 delete block;
00454 }
00455
00456 end = timestamp();
00457 }
00458
00459 STXXL_VERBOSE("Elapsed time : " << end - begin << " s. Distribution time: " <<
00460 dist_end - begin << " s");
00461 STXXL_VERBOSE("Time in I/O wait(ds): " << io_wait_after_d << " s");
00462 STXXL_VERBOSE(*stats::get_instance());
00463 STXXL_UNUSED(begin + dist_end + end + io_wait_after_d);
00464 }
00465
00467
00468 __STXXL_END_NAMESPACE
00469
00470 #endif // !STXXL_STABLE_KSORT_HEADER